[jira] [Resolved] (KAFKA-15666) Добавить функцию поиска по почте

2023-10-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15666.
-
Resolution: Invalid

[~noraverba] – We can only take tickets in English. – Also piped the title 
through a translater an it did not really make sense to me, especially as the 
description is empty.

Close as invalid.

> Добавить функцию поиска по почте
> 
>
> Key: KAFKA-15666
> URL: https://issues.apache.org/jira/browse/KAFKA-15666
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Eleonora
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15672) Add 3.6 to streams system tests

2023-10-23 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15672:
---

 Summary: Add 3.6 to streams system tests
 Key: KAFKA-15672
 URL: https://issues.apache.org/jira/browse/KAFKA-15672
 Project: Kafka
  Issue Type: Test
  Components: streams, system tests
Reporter: Matthias J. Sax


3.6.0 was released recently. We need to add `3.6.0` to the system tests (in 
particular upgrade and broker compatibility tests)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15672) Add 3.6 to streams system tests

2023-10-23 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15672:
---

 Summary: Add 3.6 to streams system tests
 Key: KAFKA-15672
 URL: https://issues.apache.org/jira/browse/KAFKA-15672
 Project: Kafka
  Issue Type: Test
  Components: streams, system tests
Reporter: Matthias J. Sax


3.6.0 was released recently. We need to add `3.6.0` to the system tests (in 
particular upgrade and broker compatibility tests)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778511#comment-17778511
 ] 

Matthias J. Sax edited comment on KAFKA-15602 at 10/23/23 4:09 AM:
---

Hey, [~pnee] pointed me to this ticket. I am not a ByterBuffer expert, but it 
seems to me, that a reasonable API contract would be, that the user is 
responsible to prepare the buffer for reading, and the returned byte[] array is 
from the buffer's position to limit. Any other behavior seems odd to me. 
Thoughts? (Not sure if the position should be modified by the serializer or not 
thought? I tend to think yes?)

Overall, it seems like a case to fix forward? If the above propose contract 
makes sense, it's rather a bug fix and we should just push it out? (In the end 
KAFKA-4852 was done without a KIP, too – kinda boarder line to begin with...) 

For example (1), it seems it would be a breaking change though? Given that this 
is indeed the "prime" use-case, doing a KIP might indeed be better? (For this 
case, reverting KAFKA-4852 might indeed be a good call, as it breaks more than 
it fixes as it seems.)

For example (2), pre-3.4 seems to be correct, while 3.4+ is broken.

For example (3), pre-3.4 seems to be correct, while 3.4+ seems to be incorrect 
– the user did not limit to 4 bytes, so all 8 bytes should be returned IMHO?

For example (4), pre-3.4 seems to be incorrect, while 3.4+ seems to be 
incorrect, too? In the end, the position was not set back to zero, hence a 
zero-length array should be returned?

And last example (5), as already discussed, is also broken in all versions, but 
should result "est".
{quote}I can see the problem comes from the serializer doesn't know if the 
position is an offset or just the next byte to be written.  These are two 
different definitions of the position so it doesn't really make sense to handle 
both cases in a single API call.
{quote}
Not sure if I can follow here? What do you mean bey "is an offset or just the 
next byte to be written"? Why would position have two different definitions?

\cc [~showuon] as RM for 3.5.2 release – might be worth to get this fix in, too?


was (Author: mjsax):
Hey, [~pnee] pointed me to this ticket. I am not a ByterBuffer expert, but it 
seems to me, that a reasonable API contract would be, that the user is 
responsible to prepare the buffer for reading, and the returned byte[] array is 
from the buffer's position to limit. Any other behavior seems odd to me. 
Thoughts? (Not sure if the position should be modified by the serializer or not 
thought? I tend to think yes?)

Overall, it seems like a case to fix forward? If the above propose contract 
makes sense, it's rather a bug fix and we should just push it out? (In the end 
KAFKA-4852 was done without a KIP, too – kinda boarder line to begin with...) 

For example (1), it seems it would be a breaking change though?

For example (2), pre-3.4 seems to be correct, while 3.4+ is broken.

For example (3), pre-3.4 seems to be correct, while 3.4+ seems to be incorrect 
– the user did not limit to 4 bytes, so all 8 bytes should be returned IMHO?

For example (4), pre-3.4 seems to be incorrect, while 3.4+ seems to be 
incorrect, too? In the end, the position was not set back to zero, hence a 
zero-length array should be returned?

And last example (5), as already discussed, is also broken in all versions, but 
should result "est".
{quote}I can see the problem comes from the serializer doesn't know if the 
position is an offset or just the next byte to be written.  These are two 
different definitions of the position so it doesn't really make sense to handle 
both cases in a single API call.
{quote}
Not sure if I can follow here? What do you mean bey "is an offset or just the 
next byte to be written"? Why would position have two different definitions?

\cc [~showuon] as RM for 3.5.2 release – might be worth to get this fix in, too?

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input Byte

[jira] [Comment Edited] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778511#comment-17778511
 ] 

Matthias J. Sax edited comment on KAFKA-15602 at 10/23/23 4:06 AM:
---

Hey, [~pnee] pointed me to this ticket. I am not a ByterBuffer expert, but it 
seems to me, that a reasonable API contract would be, that the user is 
responsible to prepare the buffer for reading, and the returned byte[] array is 
from the buffer's position to limit. Any other behavior seems odd to me. 
Thoughts? (Not sure if the position should be modified by the serializer or not 
thought? I tend to think yes?)

Overall, it seems like a case to fix forward? If the above propose contract 
makes sense, it's rather a bug fix and we should just push it out? (In the end 
KAFKA-4852 was done without a KIP, too – kinda boarder line to begin with...) 

For example (1), it seems it would be a breaking change though?

For example (2), pre-3.4 seems to be correct, while 3.4+ is broken.

For example (3), pre-3.4 seems to be correct, while 3.4+ seems to be incorrect 
– the user did not limit to 4 bytes, so all 8 bytes should be returned IMHO?

For example (4), pre-3.4 seems to be incorrect, while 3.4+ seems to be 
incorrect, too? In the end, the position was not set back to zero, hence a 
zero-length array should be returned?

And last example (5), as already discussed, is also broken in all versions, but 
should result "est".
{quote}I can see the problem comes from the serializer doesn't know if the 
position is an offset or just the next byte to be written.  These are two 
different definitions of the position so it doesn't really make sense to handle 
both cases in a single API call.
{quote}
Not sure if I can follow here? What do you mean bey "is an offset or just the 
next byte to be written"? Why would position have two different definitions?

\cc [~showuon] as RM for 3.5.2 release – might be worth to get this fix in, too?


was (Author: mjsax):
Hey, [~pnee] pointed me to this ticket. I am not a ByterBuffer expert, but it 
seems to me, that a reasonable API contract would be, that the user is 
responsible to prepare the buffer for reading, and the returned byte[] array is 
from the buffer's position to limit. Any other behavior seems odd to me. 
Thoughts? (Not sure if the position should be modified by the serializer or not 
thought? I tend to think yes?)

Overall, it seems like a simple case to fix forward? Given that the standard 
use case (position = 0 and limit = max) works in all releases (example (1) in 
the ticket description), I am not really worries about introducing a breaking 
change. If the above propose contract makes sense, it's rather a bug fix and we 
should just push it out? (In the end KAFKA-4852 was done without a KIP, too – 
kinda boarder line to begin with...) 

For example (2), pre-3.4 seems to be correct, while 3.4+ is broken.

For example (3), pre-3.4 seems to be correct, while 3.4+ seems to be incorrect 
– the user did not limit to 4 bytes, so all 8 bytes should be returned IMHO?

For example (4), pre-3.4 seems to be incorrect, while 3.4+ seems to be 
incorrect, too? In the end, the position was not set back to zero, hence a 
zero-length array should be returned?

And last example (5), as already discussed, is also broken in all versions, but 
should result "est".
{quote}I can see the problem comes from the serializer doesn't know if the 
position is an offset or just the next byte to be written.  These are two 
different definitions of the position so it doesn't really make sense to handle 
both cases in a single API call.
{quote}
Not sure if I can follow here? What do you mean bey "is an offset or just the 
next byte to be written"? Why would position have two different definitions?

\cc [~showuon] as RM for 3.5.2 release – might be worth to get this fix in, too?

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 

[jira] [Updated] (KAFKA-15664) Add 3.4.0 streams upgrade/compatibility tests

2023-10-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15664:

Component/s: streams
 system tests

> Add 3.4.0 streams upgrade/compatibility tests
> -
>
> Key: KAFKA-15664
> URL: https://issues.apache.org/jira/browse/KAFKA-15664
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>
> Per the penultimate bullet on the release checklist, Kafka v3.4.0 is 
> released. We should add this version to the system tests.
> Example PR: https://github.com/apache/kafka/pull/6597/files



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15664) Add 3.4.0 streams upgrade/compatibility tests

2023-10-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15664:

Priority: Critical  (was: Major)

> Add 3.4.0 streams upgrade/compatibility tests
> -
>
> Key: KAFKA-15664
> URL: https://issues.apache.org/jira/browse/KAFKA-15664
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 3.5.0
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Critical
> Fix For: 3.5.2, 3.7.0, 3.6.1
>
>
> Per the penultimate bullet on the release checklist, Kafka v3.4.0 is 
> released. We should add this version to the system tests.
> Example PR: https://github.com/apache/kafka/pull/6597/files



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15664) Add 3.4.0 streams upgrade/compatibility tests

2023-10-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15664:

Affects Version/s: 3.5.0

> Add 3.4.0 streams upgrade/compatibility tests
> -
>
> Key: KAFKA-15664
> URL: https://issues.apache.org/jira/browse/KAFKA-15664
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 3.5.0
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.5.2, 3.7.0, 3.6.1
>
>
> Per the penultimate bullet on the release checklist, Kafka v3.4.0 is 
> released. We should add this version to the system tests.
> Example PR: https://github.com/apache/kafka/pull/6597/files



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15664) Add 3.4.0 streams upgrade/compatibility tests

2023-10-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15664:

Fix Version/s: 3.5.2
   3.7.0
   3.6.1

> Add 3.4.0 streams upgrade/compatibility tests
> -
>
> Key: KAFKA-15664
> URL: https://issues.apache.org/jira/browse/KAFKA-15664
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.5.2, 3.7.0, 3.6.1
>
>
> Per the penultimate bullet on the release checklist, Kafka v3.4.0 is 
> released. We should add this version to the system tests.
> Example PR: https://github.com/apache/kafka/pull/6597/files



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778511#comment-17778511
 ] 

Matthias J. Sax commented on KAFKA-15602:
-

Hey, [~pnee] pointed me to this ticket. I am not a ByterBuffer expert, but it 
seems to me, that a reasonable API contract would be, that the user is 
responsible to prepare the buffer for reading, and the returned byte[] array is 
from the buffer's position to limit. Any other behavior seems odd to me. 
Thoughts? (Not sure if the position should be modified by the serializer or not 
thought? I tend to think yes?)

Overall, it seems like a simple case to fix forward? Given that the standard 
use case (position = 0 and limit = max) works in all releases (example (1) in 
the ticket description), I am not really worries about introducing a breaking 
change. If the above propose contract makes sense, it's rather a bug fix and we 
should just push it out? (In the end KAFKA-4852 was done without a KIP, too – 
kinda boarder line to begin with...) 

For example (2), pre-3.4 seems to be correct, while 3.4+ is broken.

For example (3), pre-3.4 seems to be correct, while 3.4+ seems to be incorrect 
– the user did not limit to 4 bytes, so all 8 bytes should be returned IMHO?

For example (4), pre-3.4 seems to be incorrect, while 3.4+ seems to be 
incorrect, too? In the end, the position was not set back to zero, hence a 
zero-length array should be returned?

And last example (5), as already discussed, is also broken in all versions, but 
should result "est".
{quote}I can see the problem comes from the serializer doesn't know if the 
position is an offset or just the next byte to be written.  These are two 
different definitions of the position so it doesn't really make sense to handle 
both cases in a single API call.
{quote}
Not sure if I can follow here? What do you mean bey "is an offset or just the 
next byte to be written"? Why would position have two different definitions?

\cc [~showuon] as RM for 3.5.2 release – might be worth to get this fix in, too?

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning bette

[jira] [Updated] (KAFKA-15662) Implement support for clientInstanceIds in Kafka Stream

2023-10-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15662:

Component/s: streams

> Implement support for clientInstanceIds in Kafka Stream
> ---
>
> Key: KAFKA-15662
> URL: https://issues.apache.org/jira/browse/KAFKA-15662
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Apoorv Mittal
>        Assignee: Matthias J. Sax
>Priority: Major
>
> The KIP requires Kafka Stream to support below method to give access to the 
> client instance ids of the producers, consumers and admin clients used by 
> Kafka Streams.
>  
> This method is only permitted when Kafka Streams is in state RUNNING or 
> REBALANCING. In the event that Kafka Streams is not in state RUNNING or 
> REBALANCING, the method throws 
> {{org.apache.kafka.streams.errors.StreamsNotRunningException}} , which is a 
> new subclass of {{InvalidStateStoreException}} .
>  
> {code:java}
> public ClientInstanceIds clientInstanceIds(Duration timeout); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15378) Rolling upgrade system tests are failing

2023-10-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15378.
-
Fix Version/s: 3.4.2
   3.5.2
   3.7.0
   3.6.1
   Resolution: Fixed

> Rolling upgrade system tests are failing
> 
>
> Key: KAFKA-15378
> URL: https://issues.apache.org/jira/browse/KAFKA-15378
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 3.5.1
>Reporter: Lucas Brutschy
>    Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> The system tests are having failures for these tests:
> {noformat}
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.1.2.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.2.3.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.6.0-SNAPSHOT
> {noformat}
> See 
> [https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5801/console]
>  for logs and other test data.
> Note that system tests currently only run with [this 
> fix](https://github.com/apache/kafka/commit/24d1780061a645bb2fbeefd8b8f50123c28ca94e),
>  I think some CVE python library update broke the system tests... 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15378) Rolling upgrade system tests are failing

2023-10-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15378.
-
Fix Version/s: 3.4.2
   3.5.2
   3.7.0
   3.6.1
   Resolution: Fixed

> Rolling upgrade system tests are failing
> 
>
> Key: KAFKA-15378
> URL: https://issues.apache.org/jira/browse/KAFKA-15378
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 3.5.1
>Reporter: Lucas Brutschy
>    Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> The system tests are having failures for these tests:
> {noformat}
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.1.2.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.2.3.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.6.0-SNAPSHOT
> {noformat}
> See 
> [https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5801/console]
>  for logs and other test data.
> Note that system tests currently only run with [this 
> fix](https://github.com/apache/kafka/commit/24d1780061a645bb2fbeefd8b8f50123c28ca94e),
>  I think some CVE python library update broke the system tests... 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Apache Kafka 3.5.2 release

2023-10-20 Thread Matthias J. Sax

Thanks for the info Luke.

We did backport all but one PR in the mean time. The missing PR is a 
RocksDB version bump. We want to consider it for 3.5.2, because it 
addresses a CVE.


Cf https://github.com/apache/kafka/pull/14216

However, RocksDB versions bumps are a little bit more tricky, and we 
would like to test this properly on 3.5 branch, what would take at least 
one week; we could do the cherry-pick on Monday and start testing.


Please let us know if such a delay for 3.5.2 is acceptable or not.

Thanks.

-Matthias


On 10/20/23 5:44 AM, Luke Chen wrote:

Hi Ryan,

OK, I've backported it to 3.5 branch.
I'll be included in v3.5.2.

Thanks.
Luke

On Fri, Oct 20, 2023 at 7:43 AM Ryan Leslie (BLP/ NEW YORK (REMOT) <
rles...@bloomberg.net> wrote:


Hi Luke,

Hope you are well. Can you please include
https://issues.apache.org/jira/browse/KAFKA-15106 in 3.5.2?

Thanks,

Ryan

From: dev@kafka.apache.org At: 10/17/23 05:05:24 UTC-4:00
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] Apache Kafka 3.5.2 release

Thanks Luke for volunteering for 3.5.2 release.

On Tue, 17 Oct 2023 at 11:58, Josep Prat 
wrote:


Hi Luke,

Thanks for taking this one!

Best,

On Tue, Oct 17, 2023 at 8:12 AM Luke Chen  wrote:


Hi all,

I'd like to volunteer as release manager for the Apache Kafka 3.5.2, to
have an important bug/vulnerability fix release for 3.5.1.

If there are no objections, I'll start building a release plan in

thewiki

in the next couple of weeks.

Thanks,
Luke




--
[image: Aiven] 

*Josep Prat*
Open Source Engineering Director, *Aiven*
josep.p...@aiven.io | +491715557497
aiven.io  | 
 
*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B








Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-20 Thread Matthias J. Sax

Forgot one thing:

We could also pass `currentLag()` into `onBachLoaded()` instead of 
end-offset.



-Matthias

On 10/20/23 10:56 AM, Matthias J. Sax wrote:

Thanks for digging into this Bruno.

The JavaDoc on the consumer does not say anything specific about 
`endOffset` guarantees:


Get the end offsets for the given partitions. In the default {@code 
read_uncommitted} isolation level, the end
offset is the high watermark (that is, the offset of the last 
successfully replicated message plus one). For
{@code read_committed} consumers, the end offset is the last stable 
offset (LSO), which is the minimum of
the high watermark and the smallest offset of any open transaction. 
Finally, if the partition has never been

written to, the end offset is 0.


Thus, I actually believe that it would be ok to change the 
implementation and serve the answer from the `TopicPartitionState`?


Another idea would be, to use `currentLag()` in combination with 
`position()` (or the offset of the last read record) to compute the 
end-offset of the fly?



-Matthias

On 10/20/23 4:00 AM, Bruno Cadonna wrote:

Hi,

Matthias is correct that the end offsets are stored somewhere in the 
metadata of the consumer. More precisely, they are stored in the 
`TopicPartitionState`. However, I could not find public API on the 
consumer other than currentLag() that uses the stored end offsets. If 
I understand the code correctly, method endOffSets() always triggers a 
remote call.


I am a bit concerned about doing remote calls every commit.interval.ms 
(by default 200ms under EOS). At the moment the remote calls are only 
issued if an optimization for KTables is turned on where changelog 
topics are replaced with the input topic of the KTable. The current 
remote calls retrieve all committed offsets of the group at once. If I 
understand correctly, that is one single remote call. Remote calls for 
getting end offsets of changelog topics -- as I understand you are 
planning to issue -- will probably result in multiple remote calls to 
multiple leaders of the changelog topic partitions.


Please correct me if I misunderstood anything of the above.

If my understanding is correct, I propose to modify the consumer in 
such a way to get the end offset from the locally stored metadata 
whenever possible as part of the implementation of this KIP. I do not 
know what the implications are of such a change of the consumer and if 
a KIP is needed for it. Maybe, endOffsets() guarantees to return the 
freshest end offsets possible, which would not be satisfied with the 
modification.


Regarding the naming, I do not completely agree with Matthias. While 
the pattern might be consistent with onBatchUpdated, what is the 
meaning of onBatchUpdated? Is the batch updated? The names 
onBatchLoaded or onBatchWritten or onBatchAdded are more clear IMO.
With "restore" the pattern works better. If I restore a batch of 
records in a state, the records are not there although they should be 
there and I add them. If I update a batch of records in a state. This 
sounds like the batch of records is in the state and I modify the 
existing records within the state. That is clearly not the meaning of 
the event for which the listener should be called.


Best,
Bruno



On 10/19/23 2:12 AM, Matthias J. Sax wrote:

Thanks for the KIP. Seems I am almost late to the party.

About naming (fun, fun, fun): I like the current proposal overall, 
except `onBachLoaded`, but would prefer `onBatchUpdated`. It better 
aligns to everything else:


  - it's an update-listener, not loaded-listener
  - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, 
`onRestoreSuspended, and `onBachRestored` (it's very consistent
  - `StandbyUpdateListener` should have `onUpdateStart`, 
`onUpdateSuspended` and `onBatchUpdated`  to be equally consistent 
(using "loaded" breaks the pattern)



About the end-offset question: I am relatively sure that the consumer 
gets the latest end-offset as attached metadata in every fetch 
response. (We exploit this behavior to track end-offsets for input 
topic with regard to `max.task.idle.ms` without overhead -- it was 
also a concern when we did the corresponding KIP how we could track 
lag with no overhead).


Thus, I believe we would "just" need to modify the code accordingly 
to get this information from the restore-consumer 
(`restorConsumer.endOffsets(...)`; should be served w/o RPC but from 
internal metadata cache) for free, and pass into the listener.


Please double check / verify this claim and keep me honest about it.


-Matthias

On 10/17/23 6:38 AM, Eduwer Camacaro wrote:

Hi Bruno,

Thanks for your observation; surely it will require a network call 
using
the admin client in order to know this "endOffset" and that will 
have an
impact on performance. We can either find a solution that has a low 
impact
on performance or ideally zero impact; unfortunately, I don't see a 
way to

have zero impact 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-20 Thread Matthias J. Sax

Thanks for digging into this Bruno.

The JavaDoc on the consumer does not say anything specific about 
`endOffset` guarantees:



Get the end offsets for the given partitions. In the default {@code 
read_uncommitted} isolation level, the end
offset is the high watermark (that is, the offset of the last successfully 
replicated message plus one). For
{@code read_committed} consumers, the end offset is the last stable offset 
(LSO), which is the minimum of
the high watermark and the smallest offset of any open transaction. Finally, if 
the partition has never been
written to, the end offset is 0.


Thus, I actually believe that it would be ok to change the 
implementation and serve the answer from the `TopicPartitionState`?


Another idea would be, to use `currentLag()` in combination with 
`position()` (or the offset of the last read record) to compute the 
end-offset of the fly?



-Matthias

On 10/20/23 4:00 AM, Bruno Cadonna wrote:

Hi,

Matthias is correct that the end offsets are stored somewhere in the 
metadata of the consumer. More precisely, they are stored in the 
`TopicPartitionState`. However, I could not find public API on the 
consumer other than currentLag() that uses the stored end offsets. If I 
understand the code correctly, method endOffSets() always triggers a 
remote call.


I am a bit concerned about doing remote calls every commit.interval.ms 
(by default 200ms under EOS). At the moment the remote calls are only 
issued if an optimization for KTables is turned on where changelog 
topics are replaced with the input topic of the KTable. The current 
remote calls retrieve all committed offsets of the group at once. If I 
understand correctly, that is one single remote call. Remote calls for 
getting end offsets of changelog topics -- as I understand you are 
planning to issue -- will probably result in multiple remote calls to 
multiple leaders of the changelog topic partitions.


Please correct me if I misunderstood anything of the above.

If my understanding is correct, I propose to modify the consumer in such 
a way to get the end offset from the locally stored metadata whenever 
possible as part of the implementation of this KIP. I do not know what 
the implications are of such a change of the consumer and if a KIP is 
needed for it. Maybe, endOffsets() guarantees to return the freshest end 
offsets possible, which would not be satisfied with the modification.


Regarding the naming, I do not completely agree with Matthias. While the 
pattern might be consistent with onBatchUpdated, what is the meaning of 
onBatchUpdated? Is the batch updated? The names onBatchLoaded or 
onBatchWritten or onBatchAdded are more clear IMO.
With "restore" the pattern works better. If I restore a batch of records 
in a state, the records are not there although they should be there and 
I add them. If I update a batch of records in a state. This sounds like 
the batch of records is in the state and I modify the existing records 
within the state. That is clearly not the meaning of the event for which 
the listener should be called.


Best,
Bruno



On 10/19/23 2:12 AM, Matthias J. Sax wrote:

Thanks for the KIP. Seems I am almost late to the party.

About naming (fun, fun, fun): I like the current proposal overall, 
except `onBachLoaded`, but would prefer `onBatchUpdated`. It better 
aligns to everything else:


  - it's an update-listener, not loaded-listener
  - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, 
`onRestoreSuspended, and `onBachRestored` (it's very consistent
  - `StandbyUpdateListener` should have `onUpdateStart`, 
`onUpdateSuspended` and `onBatchUpdated`  to be equally consistent 
(using "loaded" breaks the pattern)



About the end-offset question: I am relatively sure that the consumer 
gets the latest end-offset as attached metadata in every fetch 
response. (We exploit this behavior to track end-offsets for input 
topic with regard to `max.task.idle.ms` without overhead -- it was 
also a concern when we did the corresponding KIP how we could track 
lag with no overhead).


Thus, I believe we would "just" need to modify the code accordingly to 
get this information from the restore-consumer 
(`restorConsumer.endOffsets(...)`; should be served w/o RPC but from 
internal metadata cache) for free, and pass into the listener.


Please double check / verify this claim and keep me honest about it.


-Matthias

On 10/17/23 6:38 AM, Eduwer Camacaro wrote:

Hi Bruno,

Thanks for your observation; surely it will require a network call using
the admin client in order to know this "endOffset" and that will have an
impact on performance. We can either find a solution that has a low 
impact
on performance or ideally zero impact; unfortunately, I don't see a 
way to

have zero impact on performance. However, we can leverage the existing
#maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the 
admin
client to ask for these "endOf

Re: [DISCUSS] KIP-992 Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery

2023-10-20 Thread Matthias J. Sax

Thanks for the KIP Hanyu,

One questions:


To address this inconsistency, we propose that KeyQuery  should be restricted 
to querying kv-stores  only, ensuring that it always returns a plain V  type, 
making the behavior of the aforementioned code more predictable. Similarly, 
RangeQuery  should be dedicated to querying kv-stores , consistently returning 
only the plain V .


Why do you want to restrict `KeyQuery` and `RangeQuery` to kv-stores? I 
think it would be possible to still allow both queries for ts-kv-stores, 
but change the implementation to return "plain V" instead of 
`ValueAndTimestamp`, ie, the implementation would automatically 
unwrap the value.




-Matthias

On 10/20/23 2:32 AM, Alieh Saeedi wrote:

Hey Hanyu,

Thanks for the KIP. It seems good to me.
Just one point: AFAIK, we are going to remove "get" from the name of all
getter methods.

Cheers,
Alieh

On Thu, Oct 19, 2023 at 5:44 PM Hanyu (Peter) Zheng
 wrote:


Hello everyone,

I would like to start the discussion for KIP-992: Proposal to introduce
IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery

The KIP can be found here:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery

Any suggestions are more than welcome.

Many thanks,
Hanyu

On Thu, Oct 19, 2023 at 8:17 AM Hanyu (Peter) Zheng 
wrote:





https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery


--

[image: Confluent] 
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<

https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:
Twitter] [image: LinkedIn]
[image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]
<

https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic






--

[image: Confluent] 
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<
https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:

Twitter] [image: LinkedIn]
[image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]
<
https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic








[jira] [Updated] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-10-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13973:

Fix Version/s: 3.5.2

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Assignee: Nicholas Telford
>Priority: Minor
> Fix For: 3.5.2, 3.7.0, 3.6.1
>
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
> noTimestampsIter.seekToFirst();
> if (noTimestampsIter.isValid()) {
> log.info("Opening store {} in upgrade mode", name);
> dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, 
> withTimestampColumnFamily);
> } else {
> log.info("Opening store {} in regular mode", name);
> dbAccessor = new 
> SingleColumnFamilyAccessor(withTimestampColumnFamily);
> noTimestampColumnFamily.close();
> db.dropColumnFamily(noTimestampColumnFamily); // try fix it
> }
> noTimestampsIter.close();
> }{code}
>  
>  
>  
> {{But it's seems that you can't drop the default column family in RocksDb 
> (see screenshot).}}
> {{*So how can we have the real block-cache-capacity metrics value in Kafka 
> Streams monitoring ?* }}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-7699) Improve wall-clock time punctuations

2023-10-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1550#comment-1550
 ] 

Matthias J. Sax commented on KAFKA-7699:


Happy to support you. The KIP wiki page describes how it works: 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] 
– if you have any questions about it, happy to answer them.

> Improve wall-clock time punctuations
> 
>
> Key: KAFKA-7699
> URL: https://issues.apache.org/jira/browse/KAFKA-7699
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Currently, wall-clock time punctuation allow to schedule periodic call backs 
> based on wall-clock time progress. The punctuation time starts, when the 
> punctuation is scheduled, thus, it's non-deterministic what is desired for 
> many use cases (I want a call-back in 5 minutes from "now").
> It would be a nice improvement, to allow users to "anchor" wall-clock 
> punctation, too, similar to a cron job: Thus, a punctuation would be 
> triggered at "fixed" times like the beginning of the next hour, independent 
> when the punctuation was registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #2305

2023-10-19 Thread Matthias J. Sax

You would need to unsubscribe from the dev list.

I would recommend to setup a filter with you email provider if you don't 
want these and re-direct them directly to trash.



-Matthias

On 10/19/23 4:49 AM, Shyam P wrote:

how to unsubscribe this ?

On Thu, Oct 19, 2023 at 1:30 PM Apache Jenkins Server <
jenk...@builds.apache.org> wrote:


See <
https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2305/display/redirect









[jira] [Commented] (KAFKA-7699) Improve wall-clock time punctuations

2023-10-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1368#comment-1368
 ] 

Matthias J. Sax commented on KAFKA-7699:


[~hermankj] – thanks for your comment. – This change would require a KIP but 
should not be too difficult to do. Would you have interest to pick it up?

> Improve wall-clock time punctuations
> 
>
> Key: KAFKA-7699
> URL: https://issues.apache.org/jira/browse/KAFKA-7699
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Currently, wall-clock time punctuation allow to schedule periodic call backs 
> based on wall-clock time progress. The punctuation time starts, when the 
> punctuation is scheduled, thus, it's non-deterministic what is desired for 
> many use cases (I want a call-back in 5 minutes from "now").
> It would be a nice improvement, to allow users to "anchor" wall-clock 
> punctation, too, similar to a cron job: Thus, a punctuation would be 
> triggered at "fixed" times like the beginning of the next hour, independent 
> when the punctuation was registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15616) Define client telemetry states and their transitions

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15616:

Fix Version/s: 3.7.0

> Define client telemetry states and their transitions
> 
>
> Key: KAFKA-15616
> URL: https://issues.apache.org/jira/browse/KAFKA-15616
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.7.0
>
>
> The client emitting metrics to broker needs to maintain states which 
> specifies what next action client should take i.e. request subscriptions, 
> push telemetry, etc.
>  
> The changes should include comprehensive definition of all states a client 
> can move into and their transitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13973:

Fix Version/s: 3.6.1

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Assignee: Nicholas Telford
>Priority: Minor
> Fix For: 3.7.0, 3.6.1
>
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
> noTimestampsIter.seekToFirst();
> if (noTimestampsIter.isValid()) {
> log.info("Opening store {} in upgrade mode", name);
> dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, 
> withTimestampColumnFamily);
> } else {
> log.info("Opening store {} in regular mode", name);
> dbAccessor = new 
> SingleColumnFamilyAccessor(withTimestampColumnFamily);
> noTimestampColumnFamily.close();
> db.dropColumnFamily(noTimestampColumnFamily); // try fix it
> }
> noTimestampsIter.close();
> }{code}
>  
>  
>  
> {{But it's seems that you can't drop the default column family in RocksDb 
> (see screenshot).}}
> {{*So how can we have the real block-cache-capacity metrics value in Kafka 
> Streams monitoring ?* }}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15338) The metric group documentation for metrics added in KAFKA-13945 is incorrect

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15338:

Affects Version/s: (was: 3.4.0)
   (was: 3.3.1)
   (was: 3.3.2)
   (was: 3.5.0)
   (was: 3.4.1)
   (was: 3.5.1)

> The metric group documentation for metrics added in KAFKA-13945 is incorrect
> 
>
> Key: KAFKA-15338
> URL: https://issues.apache.org/jira/browse/KAFKA-15338
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0
>Reporter: Neil Buesing
>Assignee: Atul Sharma
>Priority: Trivial
>  Labels: beginner, newbie
> Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2
>
>
> ops.html (docs/streams/ops.html) incorrectly states that the metrics type is 
> "stream-processor-node-metrics", but in looking at the metrics and inspecting 
> the code in TopicMetrics, these metrics have a type of "stream-topic-metrics".
> 4 metrics are in error "bytes-consumed-total", "bytes-produced-total", 
> "records-consumed-total", and "records-produced-total".
> Looks like the type was changed from the KIP, and the documentation still 
> reflects the KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15338) The metric group documentation for metrics added in KAFKA-13945 is incorrect

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15338:

Fix Version/s: 3.3.3
   3.4.2
   3.5.2

> The metric group documentation for metrics added in KAFKA-13945 is incorrect
> 
>
> Key: KAFKA-15338
> URL: https://issues.apache.org/jira/browse/KAFKA-15338
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1, 3.5.1
>Reporter: Neil Buesing
>Assignee: Atul Sharma
>Priority: Trivial
>  Labels: beginner, newbie
> Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2
>
>
> ops.html (docs/streams/ops.html) incorrectly states that the metrics type is 
> "stream-processor-node-metrics", but in looking at the metrics and inspecting 
> the code in TopicMetrics, these metrics have a type of "stream-topic-metrics".
> 4 metrics are in error "bytes-consumed-total", "bytes-produced-total", 
> "records-consumed-total", and "records-produced-total".
> Looks like the type was changed from the KIP, and the documentation still 
> reflects the KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimeStampKeyQuery and TimeStampRangeQuery

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15629:

Description: 
KIP-992: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampKeyQuery+and+TimestampRangeQuery]

In the current IQv2 code, there are noticeable differences when interfacing 
with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
simple value for plain-kv-store but evolves into ValueAndTimestamp for 
ts-kv-store, which presents type safety issues in the API.

Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
compatibility concerns.

This brings us to the essence of our proposal: the introduction of distinct 
query types. One that returns a plain value, another for values accompanied by 
timestamps.

While querying a ts-kv-store for a plain value and then extracting it is 
feasible, it doesn't make sense to query a plain-kv-store for a 
ValueAndTimestamp.

Our vision is for plain-kv-store to always return V, while ts-kv-store should 
return ValueAndTimestamp.

  was:
In the current IQv2 code, there are noticeable differences when interfacing 
with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
simple value for plain-kv-store but evolves into ValueAndTimestamp for 
ts-kv-store, which presents type safety issues in the API.

Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
compatibility concerns.

This brings us to the essence of our proposal: the introduction of distinct 
query types. One that returns a plain value, another for values accompanied by 
timestamps.

While querying a ts-kv-store for a plain value and then extracting it is 
feasible, it doesn't make sense to query a plain-kv-store for a 
ValueAndTimestamp.

Our vision is for plain-kv-store to always return V, while ts-kv-store should 
return ValueAndTimestamp.


> proposal to introduce IQv2 Query Types: TimeStampKeyQuery and 
> TimeStampRangeQuery
> -
>
> Key: KAFKA-15629
> URL: https://issues.apache.org/jira/browse/KAFKA-15629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
>
> KIP-992: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampKeyQuery+and+TimestampRangeQuery]
> In the current IQv2 code, there are noticeable differences when interfacing 
> with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
> simple value for plain-kv-store but evolves into ValueAndTimestamp for 
> ts-kv-store, which presents type safety issues in the API.
> Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
> compatibility concerns.
> This brings us to the essence of our proposal: the introduction of distinct 
> query types. One that returns a plain value, another for values accompanied 
> by timestamps.
> While querying a ts-kv-store for a plain value and then extracting it is 
> feasible, it doesn't make sense to query a plain-kv-store for a 
> ValueAndTimestamp.
> Our vision is for plain-kv-store to always return V, while ts-kv-store should 
> return ValueAndTimestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimeStampKeyQuery and TimeStampRangeQuery

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15629:

Labels: kip  (was: )

> proposal to introduce IQv2 Query Types: TimeStampKeyQuery and 
> TimeStampRangeQuery
> -
>
> Key: KAFKA-15629
> URL: https://issues.apache.org/jira/browse/KAFKA-15629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
>
> In the current IQv2 code, there are noticeable differences when interfacing 
> with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
> simple value for plain-kv-store but evolves into ValueAndTimestamp for 
> ts-kv-store, which presents type safety issues in the API.
> Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
> compatibility concerns.
> This brings us to the essence of our proposal: the introduction of distinct 
> query types. One that returns a plain value, another for values accompanied 
> by timestamps.
> While querying a ts-kv-store for a plain value and then extracting it is 
> feasible, it doesn't make sense to query a plain-kv-store for a 
> ValueAndTimestamp.
> Our vision is for plain-kv-store to always return V, while ts-kv-store should 
> return ValueAndTimestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-18 Thread Matthias J. Sax

Thanks for the KIP. Seems I am almost late to the party.

About naming (fun, fun, fun): I like the current proposal overall, 
except `onBachLoaded`, but would prefer `onBatchUpdated`. It better 
aligns to everything else:


 - it's an update-listener, not loaded-listener
 - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, 
`onRestoreSuspended, and `onBachRestored` (it's very consistent
 - `StandbyUpdateListener` should have `onUpdateStart`, 
`onUpdateSuspended` and `onBatchUpdated`  to be equally consistent 
(using "loaded" breaks the pattern)



About the end-offset question: I am relatively sure that the consumer 
gets the latest end-offset as attached metadata in every fetch response. 
(We exploit this behavior to track end-offsets for input topic with 
regard to `max.task.idle.ms` without overhead -- it was also a concern 
when we did the corresponding KIP how we could track lag with no overhead).


Thus, I believe we would "just" need to modify the code accordingly to 
get this information from the restore-consumer 
(`restorConsumer.endOffsets(...)`; should be served w/o RPC but from 
internal metadata cache) for free, and pass into the listener.


Please double check / verify this claim and keep me honest about it.


-Matthias

On 10/17/23 6:38 AM, Eduwer Camacaro wrote:

Hi Bruno,

Thanks for your observation; surely it will require a network call using
the admin client in order to know this "endOffset" and that will have an
impact on performance. We can either find a solution that has a low impact
on performance or ideally zero impact; unfortunately, I don't see a way to
have zero impact on performance. However, we can leverage the existing
#maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the admin
client to ask for these "endOffset"s. As far I can understand, this update
is done periodically using the "commit.interval.ms" configuration. I
believe this option will force us to invoke StandbyUpdateLister once this
interval is reached.

On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna  wrote:


Thanks for the KIP, Colt and Eduwer,

Are you sure there is also not a significant performance impact for
passing into the callback `currentEndOffset`?

I am asking because the comment here:

https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129

says that the end-offset is only updated once for standby tasks whose
changelog topic is not piggy-backed on input topics. I could also not
find the update of end-offset for those standbys.


Best,
Bruno

On 10/16/23 10:55 AM, Lucas Brutschy wrote:

Hi all,

it's a nice improvement! I don't have anything to add on top of the
previous comments, just came here to say that it seems to me consensus
has been reached and the result looks good to me.

Thanks Colt and Eduwer!
Lucas

On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy 

wrote:


Thanks, Guozhang. I've updated the KIP and will start a vote.

Colt McNealy

*Founder, LittleHorse.dev*


On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang <

guozhang.wang...@gmail.com>

wrote:


Thanks for the summary, that looks good to me.

Guozhang

On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy 

wrote:


Hello there!

Thanks everyone for the comments. There's a lot of back-and-forth

going

on,

so I'll do my best to summarize what everyone's said in TLDR format:

1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do

similarly

for the other methods.
2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
3. Remove the `earliestOffset` parameter for performance reasons.

If that's all fine with everyone, I'll update the KIP and we—well,

mostly

Edu (:  —will open a PR.

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro <

edu...@littlehorse.io>

wrote:


Hello everyone,

Thanks for all your feedback for this KIP!

I think that the key to choosing proper names for this API is

understanding

the terms used inside the StoreChangelogReader. Currently, this class

has

two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my

opinion,

using StandbyUpdateListener for the interface fits better on these

terms.

Same applies for onUpdateStart/Suspended.

StoreChangelogReader uses "the same mechanism" for active task

restoration

and standby task updates, but this is an implementation detail. Under
normal circumstances (no rebalances or task migrations), the

changelog

reader will be in STANDBY_UPDATING, which means it will be updating

standby

tasks as long as there are new records in the changelog topic. That's

why I

prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't

100%

align with StateRestoreListener, but either one is fine.

Edu

On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <

guozhang.wang...@gmail.com>

wrote:


Hello Colt,

Thanks for writing the KIP! I have read through the updated KIP and

[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2023-10-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776381#comment-17776381
 ] 

Matthias J. Sax commented on KAFKA-6520:


Could we use Admin.describeCluster() which returns the list of brokers that are 
currently online, and go into DISCONNECTED if we don't get anything back?

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Vince Mu
>Priority: Major
>  Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Apache Kafka 3.5.2 release

2023-10-17 Thread Matthias J. Sax
Thanks -- there is a few fixed for Kafka Streams we are considering to 
cherry-pick to get into 3.5.2 release -- what timeline do you target for 
the release?



-Matthias

On 10/17/23 8:47 AM, Divij Vaidya wrote:

Thank you for volunteering Luke.

--
Divij Vaidya



On Tue, Oct 17, 2023 at 3:26 PM Bill Bejeck  wrote:


Thanks for driving the release, Luke.

+1
-Bill

On Tue, Oct 17, 2023 at 5:05 AM Satish Duggana 
wrote:


Thanks Luke for volunteering for 3.5.2 release.

On Tue, 17 Oct 2023 at 11:58, Josep Prat 
wrote:


Hi Luke,

Thanks for taking this one!

Best,

On Tue, Oct 17, 2023 at 8:12 AM Luke Chen  wrote:


Hi all,

I'd like to volunteer as release manager for the Apache Kafka 3.5.2,

to

have an important bug/vulnerability fix release for 3.5.1.

If there are no objections, I'll start building a release plan in

thewiki

in the next couple of weeks.

Thanks,
Luke




--
[image: Aiven] 

*Josep Prat*
Open Source Engineering Director, *Aiven*
josep.p...@aiven.io   |   +491715557497
aiven.io    |   <

https://www.facebook.com/aivencloud>

      <

https://twitter.com/aiven_io>

*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B








Re: [VOTE] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-16 Thread Matthias J. Sax

+1 (binding)


On 10/13/23 9:24 AM, Hanyu (Peter) Zheng wrote:

Hello everyone,

I would like to start a vote for KIP-985 that Add reverseRange and
reverseAll query over kv-store in IQv2.

Sincerely,
Hanyu

On Fri, Oct 13, 2023 at 9:15 AM Hanyu (Peter) Zheng 
wrote:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-985:+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2

--

[image: Confluent] 
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
[image:
Twitter] [image: LinkedIn]
[image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]







[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"

2023-10-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775325#comment-17775325
 ] 

Matthias J. Sax commented on KAFKA-13152:
-

[~guozhang] would you have interest to help getting this into 3.7? It keeps 
slipping... :(

> Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with 
> "{statestore.cache}/{input.buffer}.max.bytes"
> -
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15594) Add 3.6.0 to streams upgrade/compatibility tests

2023-10-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775323#comment-17775323
 ] 

Matthias J. Sax commented on KAFKA-15594:
-

This ticket is to include upgrade test from 3.6 to 3.7/trunk – can only be done 
after 3.6 is released – it's WIP.

> Add 3.6.0 to streams upgrade/compatibility tests
> 
>
> Key: KAFKA-15594
> URL: https://issues.apache.org/jira/browse/KAFKA-15594
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15594) Add 3.6.0 to streams upgrade/compatibility tests

2023-10-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775324#comment-17775324
 ] 

Matthias J. Sax commented on KAFKA-15594:
-

This ticket is to include upgrade test from 3.6 to 3.7/trunk – can only be done 
after 3.6 is released – it's WIP.

> Add 3.6.0 to streams upgrade/compatibility tests
> 
>
> Key: KAFKA-15594
> URL: https://issues.apache.org/jira/browse/KAFKA-15594
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15593) Add 3.6.0 to broker/client upgrade/compatibility tests

2023-10-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775322#comment-17775322
 ] 

Matthias J. Sax commented on KAFKA-15593:
-

This ticket is to include upgrade test from 3.6 to 3.7/trunk – can only be done 
after 3.6 is released – it's WIP.

> Add 3.6.0 to broker/client upgrade/compatibility tests
> --
>
> Key: KAFKA-15593
> URL: https://issues.apache.org/jira/browse/KAFKA-15593
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Requesting permission for contributions

2023-10-13 Thread Matthias J. Sax

Done. You should be all set.

-Matthias

On 10/13/23 8:21 AM, Apoorv Mittal wrote:

Hi,
Can I please get permission to contribute KIP and assign Jiras to myself.

Wiki and Jira Id: apoorvmittal10
Email: apoorvmitta...@gmail.com

Regards,
Apoorv Mittal
+44 7721681581



Re: [DISCUSS] 3.5.2 Release

2023-10-13 Thread Matthias J. Sax
Thanks -- there is a few fixed for Kafka Streams we are considering to 
cherry-pick to get into 3.5.2 release -- can you give us a few more days 
for this?


-Matthias

On 10/12/23 6:20 PM, Sophie Blee-Goldman wrote:

Thanks for volunteering Luke!

On Thu, Oct 12, 2023 at 2:55 AM Levani Kokhreidze 
wrote:


Hi Divij,

Thanks for the explanation, makes sense.

Hi Luke, thanks you! It would be awesome to see 3.5.2 out.

Best,
Levani


On 12. Oct 2023, at 12:39, Luke Chen  wrote:

Hi Levani and Divij,

I can work on the 3.5.2 release.
I'll start a new thread for volunteering it maybe next week.

Thanks.
Luke

On Thu, Oct 12, 2023 at 5:07 PM Divij Vaidya 
wrote:


Hello Levani

 From a process perspective, there is no fixed schedule for bug fix
releases. If we have a volunteer for release manager (must be a

committer),

they can start with the process of bug fix release (with the approval of
PMC).

My personal opinion is that it's too early to start 3.6.1 and we should
wait at least 1 months to hear feedback on 3.6.0. We need to make a

careful

balance between getting the critical fixes in the hands of users as soon
as possible vs. spending community effort towards releases (the effort

that

could be used to make Kafka better, feature-wise & operational
stability-wise, otherwise).

For 3.5.2, I think there are sufficient pending (including some CVE

fixes)

to start a bug fix release. We just need a volunteer for the release
manager.

--
Divij Vaidya



On Thu, Oct 12, 2023 at 9:57 AM Levani Kokhreidze <

levani.co...@gmail.com>

wrote:


Hello,

KAFKA-15571 [1] was merged and backported to the 3.5 and 3.6 branches.

Bug

fixes the feature that was added in 3.5. Considering the feature

doesn't

work as expected without a fix, I would like to know if it's reasonable

to

start the 3.5.2 release. Of course, releasing such a massive project

like

Kafka is not a trivial task, and I am looking for the community's input

on

this if it's reasonable to start the 3.5.2 release process.

Best,
Levani

[1] - https://issues.apache.org/jira/browse/KAFKA-15571









Re: [DISCUSS] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-13 Thread Matthias J. Sax

Thanks for pointing this out Alieh! I totally missed this.

So I guess everything is settled and Hanyu can start a VOTE?

For the KIP PR, we should ensure to update the JavaDocs to avoid 
confusion in the future.



-Matthias

On 10/12/23 12:21 PM, Alieh Saeedi wrote:

Hi,
just pointing to javadocs for range() and reverseRange():

range(): *@return The iterator for this range, from smallest to largest
bytes.*
reverseRange(): * @return The reverse iterator for this range, from largest
to smallest key bytes.

Cheers,
Alieh


On Thu, Oct 12, 2023 at 7:32 AM Matthias J. Sax  wrote:


Quick addendum.

Some DSL operator use `range` and seems to rely on ascending order,
too... Of course, if we say `range()` has no ordering guarantee, we
would add `forwardRange()` and let the DSL use `forwardRange`, too.

The discussion of course also applies to `all()` and `reveserAll()`, and
and I assume also `prefixScan()` (even if there is no "reverse" version
for it).


On 10/11/23 10:22 PM, Matthias J. Sax wrote:

Thanks for raising this question Hanyu. Great find!

My interpretation is as follows (it's actually a warning signal that the
API contract is not better defined, and we should fix this by extending
JavaDocs and docs on the web page about it).

We have existing `range()` and `reverseRange()` methods on
`ReadOnlyKeyValueStore` -- the interface itself is not typed (ie, just
generics), and we state that we don't guarantee "logical order" because
underlying stores are based on `byte[]` type. So far so... well.

However, to make matters worse, we are also not explicit if the
underlying store implementation *must* return keys is
byte[]-lexicographical order or not...

For `range()`, I would be kinda willing to accept that there is no
ordering guarantee at all -- for example, if the underlying byte[]-store
is hash-based and implements a full scan to answer a `range()` it might
not be efficient, but also not incorrect if keys are be returned in some
"random" (byte[]-)order. In isolation, I don't see an API contract
violation.

However, `reverseRange` implicitly states with its name, that some
"descending order" (base on keys) is expected. Given the JavaDoc comment
about "logical" vs "byte[]" order, the contract (at least to me) is
clear: returns records in descending byte[]-lexicographical order. --
Any other interpretation seems to be off? Curious to hear if you agree
or disagree to this interpretation?



If this is correct, it means we are actually lacking a API contract for
ascending byte[]-lexicographical range scan. Furthermore, a hash-based
byte[]-store would need to actually explicitly sort it's result for
`reverseRange` to not violate the contract.

To me, this raises the question if `range()` actually has a
(non-explicit) contract about returning data in byte[]-lexicographical
order? It seems a lot of people rely on this, and our default stores
actually implement it this way. So if we don't look at `range()` in
isolation, but look at the `ReadOnlyKeyValueStore` interface
holistically, I would also buy the argument that `range()` implies
"ascending "byte[]-lexicographical order". Thoughts?

To be frank: to me, it's pretty clear that the original idea to add
`range()` was to return data in ascending order.


Question 1:
   - Do we believe that the range() contract is ascending
byte[]-lexicographical order right now?

 If yes, I would propose to make it explicit in the JavaDocs.

 If no, I would also propose to make it explicit in the JavaDocs. In
addition, it raises the question if a method `forwardRange()` (for the
lack of a better idea about a name right now) is actually missing to
provide such a contract?


Of course, we always depend on the serialization format for order, and
if users need "logical order" they need to ensure to use a serialization
format that align byte[]-lexicographical order to logical order. But for
the scope of this work, I would not even try to open this can of worms...




Looking into `RangeQuery` the JavaDocs don't say anything about order.
Thus, `RangeQuery#range()` could actually also be implemented by calling
`reverseRange()` without violating the contract as it seems. A hash-base
store could also implement it, without the need to explicitly sort...

What brings be back to my original though about having three types of
results for `Range`
   - no ordering guarantee
   - ascending (we would only give byte[]-lexicographical order)
   - descending (we would only give byte[]-lexicographical order)

Again, I actually believe that the original intent of RangeQuery was to
inherit the ascending order of `ReadOnlyKeyValueStore#range()`... Please
keep me honest about it.  On the other hand, both APIs seems to be
independent enough to not couple them... -- this could actually be a
step into the right direction and would follow the underlying idea of
IQv2 to begin with: decouple semantics

[jira] [Updated] (KAFKA-15601) Client metrics and observability

2023-10-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15601:

Component/s: streams

> Client metrics and observability
> 
>
> Key: KAFKA-15601
> URL: https://issues.apache.org/jira/browse/KAFKA-15601
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, consumer, core, producer , streams
>Reporter: Apoorv Mittal
>Priority: Major
>  Labels: kip
>
> This Jira tracks the development of KIP-714: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15601) Client metrics and observability

2023-10-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15601:

Component/s: admin
 clients
 consumer
 core
 producer 

> Client metrics and observability
> 
>
> Key: KAFKA-15601
> URL: https://issues.apache.org/jira/browse/KAFKA-15601
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, consumer, core, producer 
>Reporter: Apoorv Mittal
>Priority: Major
>
> This Jira tracks the development of KIP-714: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15601) Client metrics and observability

2023-10-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15601:

Labels: kip  (was: )

> Client metrics and observability
> 
>
> Key: KAFKA-15601
> URL: https://issues.apache.org/jira/browse/KAFKA-15601
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, consumer, core, producer 
>Reporter: Apoorv Mittal
>Priority: Major
>  Labels: kip
>
> This Jira tracks the development of KIP-714: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Matthias J. Sax
Thanks Andrew. Makes sense to me. Adding the parameter-less overload was 
just a random idea. No need to extend the KIP.



-Matthias

On 10/12/23 12:12 PM, Jun Rao wrote:

Hi, Andrew,

Thanks for the reply.

131. Could we also document how one could correlate each client instance in
KStreams with the labels for the metrics received by the brokers?

132. The documentation for RequestsPerSec is not complete. If you trace
through how
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L71
<https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L579>
is
implemented, it includes every API key tagged with the corresponding
listener.

Jun

On Thu, Oct 12, 2023 at 11:42 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:


Hi Jun,
Thanks for your comments.

130. As Matthias described, and I am adding to the KIP, the
`KafkaStreams#clientInstanceIds` method
is only permitted when the state is RUNNING or REBALANCING. Also, clients
can be added dynamically
so the maps might change over time. If it’s in a permitted state, the
method is prepared to wait up to the
supplied timeout to get the client instance ids. It does not return a
partial result - it returns a result or
fails.

131. I’ve refactored the `ClientsInstanceIds` object and the global
consumer is now part of the map
of consumers. There is no need for the Optional any longer. I’ve also
renamed it `ClientInstanceIds`.

132. My reading of
`(kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*)` is that
It does not support every request type - it supports Produce,
FetchConsumer and FetchFollower.
Consequently, I think the ClientMetricsSubscriptionRequestCount is not
instantly obsolete.

If I’ve misunderstood, please let me know.

Thanks,
Andrew



On 12 Oct 2023, at 01:07, Jun Rao  wrote:

Hi, Andrew,

Thanks for the updated KIP. Just a few more minor comments.

130. KafkaStreams.clientsInstanceId(Duration timeout): Does it wait for

all

consumer/producer/adminClient instances to be initialized? Are all those
instances created during KafkaStreams initialization?

131. Why does globalConsumerInstanceId() return Optional while
other consumer instances don't return Optional?

132. ClientMetricsSubscriptionRequestCount: Do we need this since we

have a

set of generic metrics
(kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*) that
report Request rate for every request type?

Thanks,

Jun

On Wed, Oct 11, 2023 at 1:47 PM Matthias J. Sax 

wrote:



Thanks!

On 10/10/23 11:31 PM, Andrew Schofield wrote:

Matthias,
Yes, I think that’s a sensible way forward and the interface you

propose

looks good. I’ll update the KIP accordingly.


Thanks,
Andrew


On 10 Oct 2023, at 23:01, Matthias J. Sax  wrote:

Andrew,

yes I would like to get this change into KIP-714 right way. Seems to

be

important, as we don't know if/when a follow-up KIP for Kafka Streams

would

land.


I was also thinking (and discussed with a few others) how to expose

it,

and we would propose the following:


We add a new method to `KafkaStreams` class:

public ClientsInstanceIds clientsInstanceIds(Duration timeout);

The returned object is like below:

  public class ClientsInstanceIds {
// we only have a single admin client per KS instance
String adminInstanceId();

// we only have a single global consumer per KS instance (if any)
// Optional<> because we might not have global-thread
Optional globalConsumerInstanceId();

// return a  ClientInstanceId> mapping
// for the underlying (restore-)consumers/producers
Map mainConsumerInstanceIds();
Map restoreConsumerInstanceIds();
Map producerInstanceIds();
}

For the `threadKey`, we would use some pattern like this:

  [Stream|StateUpdater]Thread-


Would this work from your POV?



-Matthias


On 10/9/23 2:15 AM, Andrew Schofield wrote:

Hi Matthias,
Good point. Makes sense to me.
Is this something that can also be included in the proposed Kafka

Streams follow-on KIP, or would you prefer that I add it to KIP-714?

I have a slight preference for the former to put all of the KS

enhancements into a separate KIP.

Thanks,
Andrew

On 7 Oct 2023, at 02:12, Matthias J. Sax  wrote:

Thanks Andrew. SGTM.

One point you did not address is the idea to add a method to

`KafkaStreams` similar to the proposed `clientInstanceId()` that will be
added to consumer/producer/admin clients.


Without addressing this, Kafka Streams users won't have a way to get

the assigned `instanceId` of the internally created clients, and thus it
would be very difficult for them to know which metrics that the broker
receives belong to a Kafka Streams app. It seems they would only find

the

`instanceIds` in the log4j output if they enable client logging?


Of course, because there is multiple clients inside Kafka Streams,

the return type cannot be an single "String", but must be som

Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Matthias J. Sax

Seems both Andrew and Jun prefer to merge the consumers. I am ok with this.

I'll leave it to Andrew to update the KIP accordingly, including adding 
`throws TimeoutException`.



-Matthias

On 10/12/23 10:07 AM, Jun Rao wrote:

Hi, Matthias,

130. Yes, throwing an exception sounds reasonable. It would be useful to
document this.

131. I was thinking that we could just return all consumers (including the
global consumer) through Map consumerInstanceIds() and use
keys to identify each consumer instance. The benefit is that the
implementation (whether to use a separate global consumer or not) could
change in the future, but the API can remain the same. Another slight
benefit is that there is no need for returning Optional. If the
global consumer is not used, it just won't be included in the map.

Thanks,

Jun


On Thu, Oct 12, 2023 at 9:30 AM Matthias J. Sax  wrote:


Thanks Sophie and Jun.

`clientInstanceIds()` is fine with me -- was not sure about the double
plural myself.

Sorry if my comments was confusing. I was trying to say, that adding a
overload to `KafkaStreams` that does not take a timeout parameter does
not make sense, because there is no `default.api.timeout.ms` config for
Kafka Streams, so users always need to pass in a timeout. (Same for
producer.)

For the implementation, I think KS would always call
`client.clientInstanceId(timeout)` and never rely on
`default.api.timeout.ms` though, so we can stay in control -- if a
timeout is passed by the user, it would always overwrite
`default.api.timeout.ms` on the consumer/admin and thus we should follow
the same semantics in Kafka Streams, and overwrite it explicitly when
calling `client.clientInstanceId()`.

The proposed API also makes sense to me. I was just wondering if we want
to extend it for client users -- for KS we won't need/use the
timeout-less overloads.



130) My intent was to throw a TimeoutException if we cannot get all
instanceIds, because it's the standard contract for timeouts. It would
also be hard to tell for a user, if a full or partial result was
returned (or we add a method `boolean isPartialResult()` to make it
easier for users).

If there is concerns/objections, I am also ok to return a partial result
-- it would require a change to the newly added `ClientInstanceIds`
return type -- for `adminInstanceId` we only return a `String` right now
-- we might need to change this to `Optional` so we are able to
return a partial result?


131) Of course we could, but I am not sure what we would gain? In the
end, implementation details would always leak because if we change the
number of consumer we use, we would return different keys in the `Map`.
Atm, the proposal implies that the same key might be used for the "main"
and "restore" consumer of the same thread -- but we can make keys unique
by adding a `-restore` suffix to the restore-consumer key if we merge
both maps. -- Curious to hear what others think. I am very open to do it
differently than currently proposed.


-Matthias


On 10/12/23 8:39 AM, Jun Rao wrote:

Hi, Matthias,

Thanks for the reply.

130. What would be the semantic? If the timeout has expired and only some
of the client instances' id have been retrieved, does the call return the
partial result or throw an exception?

131. Could we group all consumer instances in a single method since we

are

returning the key for each instance already? This probably also avoids
exposing implementation details that could change over time.

Thanks,

Jun

On Thu, Oct 12, 2023 at 12:00 AM Sophie Blee-Goldman <

sop...@responsive.dev>

wrote:


Regarding the naming, I personally think `clientInstanceId` makes sense

for

the plain clients
   -- especially if we might later introduce the notion of an
`applicationInstanceId`.

I'm not a huge fan of `clientsInstanceIds` for the Kafka Streams API,
though, can we use
`clientInstanceIds` instead? (The difference being the placement of the
plural 's')
I would similarly rename the class to just ClientInstanceIds

we can also not have a timeout-less overload,  because `KafkaStreams`

does

not have a `default.api.timeout.ms` config either


With respect to the timeout for the Kafka Streams API, I'm a bit

confused

by the
doubletriple-negative of Matthias' comment here, but I was thinking

about

this
earlier and this was my take: with the current proposal, we would allow
users to pass
in an absolute timeout as a parameter that would apply to the method as

a

whole.
Meanwhile within the method we would issue separate calls to each of the
clients using
the default or user-configured value of their  `default.api.timeout.ms`

as

the timeout
parameter.

So the API as proposed makes sense to me.


On Wed, Oct 11, 2023 at 6:48 PM Matthias J. Sax 

wrote:



In can answer 130 and 131.

130) We cannot guarantee that all clients are already initialized due

to

race conditions. We plan to not allow calling
`KafkaStreams#clientsInstanceIds()` when the state is not RUNNI

[jira] [Updated] (KAFKA-15594) Add 3.6.0 to streams upgrade/compatibility tests

2023-10-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15594:

Component/s: streams
 system tests

> Add 3.6.0 to streams upgrade/compatibility tests
> 
>
> Key: KAFKA-15594
> URL: https://issues.apache.org/jira/browse/KAFKA-15594
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Matthias J. Sax

Thanks Sophie and Jun.

`clientInstanceIds()` is fine with me -- was not sure about the double 
plural myself.


Sorry if my comments was confusing. I was trying to say, that adding a 
overload to `KafkaStreams` that does not take a timeout parameter does 
not make sense, because there is no `default.api.timeout.ms` config for 
Kafka Streams, so users always need to pass in a timeout. (Same for 
producer.)


For the implementation, I think KS would always call 
`client.clientInstanceId(timeout)` and never rely on 
`default.api.timeout.ms` though, so we can stay in control -- if a 
timeout is passed by the user, it would always overwrite 
`default.api.timeout.ms` on the consumer/admin and thus we should follow 
the same semantics in Kafka Streams, and overwrite it explicitly when 
calling `client.clientInstanceId()`.


The proposed API also makes sense to me. I was just wondering if we want 
to extend it for client users -- for KS we won't need/use the 
timeout-less overloads.




130) My intent was to throw a TimeoutException if we cannot get all 
instanceIds, because it's the standard contract for timeouts. It would 
also be hard to tell for a user, if a full or partial result was 
returned (or we add a method `boolean isPartialResult()` to make it 
easier for users).


If there is concerns/objections, I am also ok to return a partial result 
-- it would require a change to the newly added `ClientInstanceIds` 
return type -- for `adminInstanceId` we only return a `String` right now 
-- we might need to change this to `Optional` so we are able to 
return a partial result?



131) Of course we could, but I am not sure what we would gain? In the 
end, implementation details would always leak because if we change the 
number of consumer we use, we would return different keys in the `Map`. 
Atm, the proposal implies that the same key might be used for the "main" 
and "restore" consumer of the same thread -- but we can make keys unique 
by adding a `-restore` suffix to the restore-consumer key if we merge 
both maps. -- Curious to hear what others think. I am very open to do it 
differently than currently proposed.



-Matthias


On 10/12/23 8:39 AM, Jun Rao wrote:

Hi, Matthias,

Thanks for the reply.

130. What would be the semantic? If the timeout has expired and only some
of the client instances' id have been retrieved, does the call return the
partial result or throw an exception?

131. Could we group all consumer instances in a single method since we are
returning the key for each instance already? This probably also avoids
exposing implementation details that could change over time.

Thanks,

Jun

On Thu, Oct 12, 2023 at 12:00 AM Sophie Blee-Goldman 
wrote:


Regarding the naming, I personally think `clientInstanceId` makes sense for
the plain clients
  -- especially if we might later introduce the notion of an
`applicationInstanceId`.

I'm not a huge fan of `clientsInstanceIds` for the Kafka Streams API,
though, can we use
`clientInstanceIds` instead? (The difference being the placement of the
plural 's')
I would similarly rename the class to just ClientInstanceIds

we can also not have a timeout-less overload,  because `KafkaStreams` does

not have a `default.api.timeout.ms` config either


With respect to the timeout for the Kafka Streams API, I'm a bit confused
by the
doubletriple-negative of Matthias' comment here, but I was thinking about
this
earlier and this was my take: with the current proposal, we would allow
users to pass
in an absolute timeout as a parameter that would apply to the method as a
whole.
Meanwhile within the method we would issue separate calls to each of the
clients using
the default or user-configured value of their  `default.api.timeout.ms` as
the timeout
parameter.

So the API as proposed makes sense to me.


On Wed, Oct 11, 2023 at 6:48 PM Matthias J. Sax  wrote:


In can answer 130 and 131.

130) We cannot guarantee that all clients are already initialized due to
race conditions. We plan to not allow calling
`KafkaStreams#clientsInstanceIds()` when the state is not RUNNING (or
REBALANCING) though -- guess this slipped on the KIP and should be
added? But because StreamThreads can be added dynamically (and producer
might be created dynamically at runtime; cf below), we still cannot
guarantee that all clients are already initialized when the method is
called. Of course, we assume that all clients are most likely initialize
on the happy path, and blocking calls to `client.clientInstanceId()`
should be rare.

To address the worst case, we won't do a naive implementation and just
loop over all clients, but fan-out the call to the different
StreamThreads (and GlobalStreamThread if it exists), and use Futures to
gather the results.

Currently, `StreamThreads` has 3 clients (if ALOS or EOSv2 is used), so
we might do 3 blocking calls in the worst case (for EOSv1 we get a
producer per tasks, and we might end up doing more blocking calls if the
producers are

Re: [DISCUSS] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-11 Thread Matthias J. Sax

Quick addendum.

Some DSL operator use `range` and seems to rely on ascending order, 
too... Of course, if we say `range()` has no ordering guarantee, we 
would add `forwardRange()` and let the DSL use `forwardRange`, too.


The discussion of course also applies to `all()` and `reveserAll()`, and 
and I assume also `prefixScan()` (even if there is no "reverse" version 
for it).



On 10/11/23 10:22 PM, Matthias J. Sax wrote:

Thanks for raising this question Hanyu. Great find!

My interpretation is as follows (it's actually a warning signal that the 
API contract is not better defined, and we should fix this by extending 
JavaDocs and docs on the web page about it).


We have existing `range()` and `reverseRange()` methods on 
`ReadOnlyKeyValueStore` -- the interface itself is not typed (ie, just 
generics), and we state that we don't guarantee "logical order" because 
underlying stores are based on `byte[]` type. So far so... well.


However, to make matters worse, we are also not explicit if the 
underlying store implementation *must* return keys is 
byte[]-lexicographical order or not...


For `range()`, I would be kinda willing to accept that there is no 
ordering guarantee at all -- for example, if the underlying byte[]-store 
is hash-based and implements a full scan to answer a `range()` it might 
not be efficient, but also not incorrect if keys are be returned in some 
"random" (byte[]-)order. In isolation, I don't see an API contract 
violation.


However, `reverseRange` implicitly states with its name, that some 
"descending order" (base on keys) is expected. Given the JavaDoc comment 
about "logical" vs "byte[]" order, the contract (at least to me) is 
clear: returns records in descending byte[]-lexicographical order. -- 
Any other interpretation seems to be off? Curious to hear if you agree 
or disagree to this interpretation?




If this is correct, it means we are actually lacking a API contract for 
ascending byte[]-lexicographical range scan. Furthermore, a hash-based 
byte[]-store would need to actually explicitly sort it's result for 
`reverseRange` to not violate the contract.


To me, this raises the question if `range()` actually has a 
(non-explicit) contract about returning data in byte[]-lexicographical 
order? It seems a lot of people rely on this, and our default stores 
actually implement it this way. So if we don't look at `range()` in 
isolation, but look at the `ReadOnlyKeyValueStore` interface 
holistically, I would also buy the argument that `range()` implies 
"ascending "byte[]-lexicographical order". Thoughts?


To be frank: to me, it's pretty clear that the original idea to add 
`range()` was to return data in ascending order.



Question 1:
  - Do we believe that the range() contract is ascending 
byte[]-lexicographical order right now?


    If yes, I would propose to make it explicit in the JavaDocs.

    If no, I would also propose to make it explicit in the JavaDocs. In 
addition, it raises the question if a method `forwardRange()` (for the 
lack of a better idea about a name right now) is actually missing to 
provide such a contract?



Of course, we always depend on the serialization format for order, and 
if users need "logical order" they need to ensure to use a serialization 
format that align byte[]-lexicographical order to logical order. But for 
the scope of this work, I would not even try to open this can of worms...





Looking into `RangeQuery` the JavaDocs don't say anything about order. 
Thus, `RangeQuery#range()` could actually also be implemented by calling 
`reverseRange()` without violating the contract as it seems. A hash-base 
store could also implement it, without the need to explicitly sort...


What brings be back to my original though about having three types of 
results for `Range`

  - no ordering guarantee
  - ascending (we would only give byte[]-lexicographical order)
  - descending (we would only give byte[]-lexicographical order)

Again, I actually believe that the original intent of RangeQuery was to 
inherit the ascending order of `ReadOnlyKeyValueStore#range()`... Please 
keep me honest about it.  On the other hand, both APIs seems to be 
independent enough to not couple them... -- this could actually be a 
step into the right direction and would follow the underlying idea of 
IQv2 to begin with: decouple semantics for the store interfaces from the 
query types and semantics...



OR: we actually say that `RangeQuery#range` did implicitly inherit the 
(non explicit) "ascending byte[]-lexicographical" order of the 
underlying `ReadOnlyKeyValueStore`, and we just need to update the 
(Java)Docs to make it explicit. -- But it might go against the idea of 
IQv2 as stated above.



Furthermore, the consequence would be, that a potential custom 
hash-based store, would need to do extra work to `range()` to do the 
sorting (or of course might

Re: [DISCUSS] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-11 Thread Matthias J. Sax
s.

Sincerely,
Hanyu

On Thu, Oct 5, 2023 at 9:52 AM Hanyu (Peter) Zheng <

pzh...@confluent.io>

wrote:


Hi, Hao,

In this case, it will return an empty set or list in the end.

Sincerely,
Hanyu

On Wed, Oct 4, 2023 at 10:29 PM Matthias J. Sax 

wrote:



Great discussion!

It seems the only open question might be about ordering guarantees?
IIRC, we had a discussion about this in the past.


Technically (at least from my POV), existing `RangeQuery` does not

have

a guarantee that data is return in any specific order (not even on a

per

partitions bases). It just happens that RocksDB (and as pointed out

by

Hanyu already, also the built-in in-memory store that is base on a
tree-map) allows us to return data ordered by key; as mentioned

already,

this guarantee is limited on a per partition basis.

If there would be custom store base on a hashed key-value store, this
store could implement RangeQuery and return data (even for a single
partition) with no ordering, without violating the contract.



Thus, it could actually make sense, to extend `RangeQuery` and allow
three options: no-order, ascending, descending. For our existing
Rocks/InMemory implementations, no-order could be equal to ascending

and

nothing changes effectively, but it might be a better API contract?

--

If we assume that there might be a custom hash-based store, such a

store

could reject a query if "ascending" is required, or might need to do
more work to implement it (up to the store maintainer). This is

actually

the beauty of IQv2 that different stores can pick what queries they

want

to support.

  From an API contract point of view, it seems confusing to say:
specifying nothing means no guarantee (or ascending if the store can
offer it), but descending can we explicitly request. Thus, a

hash-based

store, might be able to accept "order not specified query", but would
reject "descending". This seems to be somewhat unbalanced?

Thus, I am wondering if we should actually add `withAscendingKeys()`,
too, even if it won't impact our current RocksDB/In-Memory
implementations?


The second question is about per-partition or across-partition

ordering:

it's not possible right now to actually offer across-partition

ordering

the way IQv2 is setup. The reason is, that the store that implements

a

query type, is always a single shard. Thus, the implementation does

not

have access to other shards. It's hard-coded inside Kafka Streams, to
query each shared, and to "accumulate" partial results, and return

the

back to the user. Note that the API is:



StateQueryResult result = KafkaStreams.query(...);
Map> resultPerPartitions =

result.getPartitionResults();


Thus, if we would want to offer across-partition ordering, we cannot

do

it right now, because Kafka Streams does not know anything about the
semantics of the query it distributes... -- the result is an unknown
type . We would need to extend IQv2 with an additional mechanism,
that allows users to plug in more custom code to "merge" multiple
partitions result into a "global result". This is clearly

out-of-scope

for this KIP and would require a new KIP by itself.

I seems that this contract, which is independent of the query type is
not well understood, and thus a big +1 to fix the documentation. I

don't

think that this KIP must "define" anything, but it might of course be
worth to add the explanation why the KIP cannot even offer
global-ordering, as it's defined/limited by the IQv2 "framework"

itself,

not the individual queries.



-Matthias




On 10/4/23 4:38 PM, Hao Li wrote:

Hi Hanyu,

Thanks for the KIP! Seems there are already a lot of good

discussions.

I

only have two comments:

1. Please make it clear in
```
  /**
   * Interactive range query using a lower and upper bound to

filter the

keys returned.
   * @param lower The key that specifies the lower bound of the

range

   * @param upper The key that specifies the upper bound of the

range

   * @param  The key type
   * @param  The value type
   */
  public static  RangeQuery withRange(final K lower,

final K

upper) {
  return new RangeQuery<>(Optional.ofNullable(lower),
Optional.ofNullable(upper), true);
  }
```
that a `null` in lower or upper parameter means it's unbounded.
2. What's the behavior if lower is 3 and upper is 1? Is it

IllegalArgument

or will this return an empty result? Maybe also clarify this in the
document.

Thanks,
Hao


On Wed, Oct 4, 2023 at 9:27 AM Hanyu (Peter) Zheng
 wrote:


For testing purposes, we previously used a Set to record the

results

in

IQv2StoreIntegrationTest. Let's take an example where we now have

two

partitions and four key-value pairs: <0,0> in p0, <1,1> in p1,

<2,2>

in p0,

and <3,3> in p1.

If we execute withRange(1,3), it will return a Set of <1, 2, 3>.

However,

if we run withRange(1,3).wit

Re: [DISCUSS] KIP-969: Support range interactive queries for versioned state stores

2023-10-11 Thread Matthias J. Sax

Thanks for the update.



To retrieve

the latest value(s), the user must call just the asOf method with the MAX
value (asOf(MAX)). The same applies to KIP-968. Do you think it is clumsy,
Matthias?



Well, in KIP-968 calling `asOf` and passing in a timestamp is optional, 
and default is "latest", right? So while `asOf(MAX)` does the same 
thing, practically users would never call `asOf` for a "latest" query?


In this KIP, we enforce that users give us a key range (we have the 4 
static entry point methods to define a query for this), and we say we 
default to "no bounds" for time range by default.


The existing `RangeQuery` allows to query a range of keys for existing 
stores. It seems to be a common pattern to query a key-range on latest. 
-- in the current proposal, users would need to do:


MultiVersionedRangeQuery.withKeyRange(startKey, endKey).asOf(MAX);

Would like to hear from others if we think that's good user experience? 
If we agree to accept this, I think we should explain how to do this in 
the JavaDocs (and also regular docs... --- otherwise, I can already 
anticipate user question on all question-asking-channels how to do a 
"normal key range query". IMHO, the problem is not that the code itself 
it too clumsy, but that it's totally not obvious to uses how to express 
it without actually explaining it to them. It basically violated the API 
design rule "make it easy to use / simple things should be easy".


Btw: We could also re-use `RangeQuery` and add am implementation to 
`VersionedStateStore` to just accept this query type, with "key range 
over latest" semantics. -- The issue is of course, that uses need to 
know that the query would return `ValueAndTimestamp` and not plain `V` 
(or we add a translation step to unwrap the value, but we would lose the 
"validFrom" timestamp -- validTo would be `null`). Because type safety 
is a general issue in IQv2 it would not make it worse (in the strict 
sense), but I am also not sure if we want to dig an even deeper hole...



-Matthias


On 10/10/23 11:55 AM, Alieh Saeedi wrote:

Thanks, Matthias and Bruno, for the feedback on KIP-969. Here is a summary
of the updates I made to the KIP:

1.  I liked the idea of renaming methods as Matthias suggested.
2. I removed the allversions() method as I did in KIP-968. To retrieve
the latest value(s), the user must call just the asOf method with the MAX
value (asOf(MAX)). The same applies to KIP-968. Do you think it is clumsy,
Matthias?
3. I added a method to the *VersionedKeyValueStore *interface, as I did
for KIP-968.
4. Matthias: I do not get what you mean by your second comment. Isn't
the KIP already explicit about that?

> I assume, results are returned by timestamp for each key. The KIP
should be explicit about it.


Cheers,
Alieh



On Tue, Oct 3, 2023 at 6:07 AM Matthias J. Sax  wrote:


Thanks for updating the KIP.

Not sure if I agree or not with Bruno's idea to split the query types
further? In the end, we split them only because there is three different
return types: single value, value-iterator, key-value-iterator.

What do we gain by splitting out single-ts-range-key? In the end, for
range-ts-range-key the proposed class is necessary and is a superset
(one can set both timestamps to the same value, for single-ts lookup).

The mentioned simplification might apply to "single-ts-range-key" but I
don't see a simplification for the proposed (and necessary) query type?

On the other hand, I see an advantage of a single-ts-range-key for
querying over the "latest version" with a range of keys. For a
single-ts-range-key query, this it would be the default (similar to
VersionedKeyQuery with not asOf-timestamped defined).

In the current version of the KIP, (if we agree that default should
actually return "all versions" not "latest" -- this default was
suggested by Bruno on KIP-968 and makes sense to me, so we would need to
have the same default here to stay consistent), users would need to pass
in `from(Long.MAX).to(Long.MAX)` (if I got this right) to query the
latest point in time only, what seems to be clumsy? Or we could add a
`lastestKeyOnly` option to `MultiVersionedRangeQuery`, but it does seems
a little clumsy, too.




The overall order of the returned records is by Key


I assume, results are returned by timestamp for each key. The KIP should
be explicit about it.



To be very explicit, should we rename the methods to specify the key bound?

   - withRange -> withKeyRange
   - withLowerBound -> withLowerKeyBound
   - withUpperBound -> withUpperKeyBound
   - withNoBounds -> allKeys (or withNoKeyBounds, but we use
`allVersions` and not `noTimeBound` and should align the naming?)



-Matthias


On 9/6/23 5:25 AM, Bruno Cadonna wrote:

Hi Alieh,

Thanks for the KIP!

One high level comment/question:

I assume you se

Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-10-11 Thread Matthias J. Sax
ery 
of KIP-960 and then one with a key and a time range. When you iterate 
over the results you could also call validTo(). Maybe add some actual 
records in the comments to show what the result might look like.


Regarding the test plan, I hope you also plan to add unit tests in all 
of your KIPs. Maybe you could also explain why system tests are not 
needed here.


Best,
Bruno

On 10/10/23 5:36 PM, Alieh Saeedi wrote:

Thank you all for the very exact and constructive comments. I really
enjoyed reading your ideas and all the important points you made me aware
of. I updated KIP-968 as follows:

    1. If the key or time bounds are null, the method returns NPE.
    2. The "valid" word: I removed the sentence "all the records that are
    valid..." and replaced it with an exact explanation. More over, I 
explained
    it with an example in the KIP but not in the javadocs. Do I need 
to add the

    example to the javadocs as well?
    3. Since I followed Bruno's suggestion and removed the allVersions()
    method, the problem of meaningless combinations is solved, and I 
do not
    need any IllegalArgumentException or something like that. 
Therefore, the
    change is that if no time bound is specified, the query returns 
the records

    with the specified key for all timestamps (all versions).
    4. As Victoria suggested, adding a method to the 
*VersionedKeyValueStore

    *interface is essential. So I did that. I had this method only in the
    RocksDBVersionedStore class, which was not enough.
    5. I added the *validTo* field to the VersionedRecord class to be 
able
    to represent the tombstones. As you suggested, we postpone solving 
the

    problem of retrieving consecutive tombstones for later.
    6. I added the "Test Plan" section to all KIPs. I hope what I 
wrote is

    convincing.
    7. I added the *withAscendingTimestamp()* method to provide more
code readability
    for the user.
    8. I removed the evil word "get" from all getter methods.

There have also been some more suggestions which I am still not convinced
or clear about them:

    1. Regarding asOf vs until: reading all comments, my conclusion 
was that
    I keep it as "asOf" (following Walker's idea as the native speaker 
as well
    as Bruno's suggestion to be consistent with single-key_single_ts 
queries).
    But I do not have a personal preference. If you insist on "until", 
I change

    it.
    2. Bruno suggested renaming the class "MultiVersionedKeyQuery" to sth
    else. We already had a long discussion about the name with 
Matthias. I am

    open to renaming it to something else, but do you have any ideas?
    3. Matthias suggested having a method with two input parameters that
    enables the user to specify both time bounds in the same method. 
Isn't it
    introducing redundancy? It is somehow disrespectful to the idea of 
having

    composable methods.
    4. Bruno suggested renaming the methods "asOf" and "from" to 
"asOfTime"

    and "fromTime". If I do that, then it is not consistent with KIP-960.
    Moreover, the input parameter is clearly a timestamp, which explains
    enough. What do you think about that?
    5. I was asked to add more examples to the example section. My 
question
    is, what is the main purpose of that? If I know it clearly, then I 
can add

    what you mean.



Cheers,
Alieh

On Tue, Oct 10, 2023 at 1:13 AM Matthias J. Sax  wrote:


Bruno and I had some background conversation about the `get` prefix
question including a few other committers.

The official policy was never changed, and we should not add the
`get`-prefix. It's a slip on our side in previous KIPs to add the
`get`-prefix and we should actually clean it up doing a follow up KIP.


-Matthias


On 10/5/23 5:26 AM, Bruno Cadonna wrote:

Hi Matthias,

Given all the IQv2 KIPs that use getX and given recent PRs (internal
interfaces mainly) that got merged, I was under the impression that we
moved away from the strict no-getX policy.

I do not think it was an accident using getX in the IQv2 KIPs since
somebody would have brought it up, otherwise.

I am fine with both types of getters.

If we think, we need to discuss this in a broader context, let's 
start a

separate thread.


Best,
Bruno





On 10/5/23 7:44 AM, Matthias J. Sax wrote:

I agree to (almost) everything what Bruno said.



In general, we tend to move away from using getters without "get",
recently. So I would keep the "get".


This is new to me? Can you elaborate on this point? Why do you think
that's the case?

I actually did realize (after Walker mentioned it) that existing query
types use `get` prefix, but to me it seems that it was by accident and
we should consider correcting it? Thus, I would actually prefer to not
add the `get` prefix for new methods query types.

IMHO, we should do a follow up KIP to deprecate all methods with `ge

Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-11 Thread Matthias J. Sax

In can answer 130 and 131.

130) We cannot guarantee that all clients are already initialized due to 
race conditions. We plan to not allow calling 
`KafkaStreams#clientsInstanceIds()` when the state is not RUNNING (or 
REBALANCING) though -- guess this slipped on the KIP and should be 
added? But because StreamThreads can be added dynamically (and producer 
might be created dynamically at runtime; cf below), we still cannot 
guarantee that all clients are already initialized when the method is 
called. Of course, we assume that all clients are most likely initialize 
on the happy path, and blocking calls to `client.clientInstanceId()` 
should be rare.


To address the worst case, we won't do a naive implementation and just 
loop over all clients, but fan-out the call to the different 
StreamThreads (and GlobalStreamThread if it exists), and use Futures to 
gather the results.


Currently, `StreamThreads` has 3 clients (if ALOS or EOSv2 is used), so 
we might do 3 blocking calls in the worst case (for EOSv1 we get a 
producer per tasks, and we might end up doing more blocking calls if the 
producers are not initialized yet). Note that EOSv1 is already 
deprecated, and we are also working on thread refactoring that will 
reduce the number of client on StreamThread to 2 -- and we have more 
refactoring planned to reduce the number of clients even further.


Inside `KafakStreams#clientsInstanceIds()` we might only do single 
blocking call for the admin client (ie, `admin.clientInstanceId()`).


I agree that we need to do some clever timeout management, but it seems 
to be more of an implementation detail?


Do you have any particular concerns, or does the proposed implementation 
as sketched above address your question?



130) If the Topology does not have a global-state-store, there won't be 
a GlobalThread and thus not global consumer. Thus, we return an Optional.




On three related question for Andrew.

(1) Why is the method called `clientInstanceId()` and not just plain 
`instanceId()`?


(2) Why so we return a `String` while but not a UUID type? The added 
protocol request/response classes use UUIDs.


(3) Would it make sense to have an overloaded `clientInstanceId()` 
method that does not take any parameter but uses `default.api.timeout` 
config (this config does no exist on the producer though, so we could 
only have it for consumer and admin at this point). We could of course 
also add overloads like this later if user request them (and/or add 
`default.api.timeout.ms` to the producer, too).


Btw: For KafkaStreams, I think `clientsInstanceIds` still makes sense as 
a method name though, as `KafkaStreams` itself does not have an 
`instanceId` -- we can also not have a timeout-less overload, because 
`KafkaStreams` does not have a `default.api.timeout.ms` config either 
(and I don't think it make sense to add).




-Matthias

On 10/11/23 5:07 PM, Jun Rao wrote:

Hi, Andrew,

Thanks for the updated KIP. Just a few more minor comments.

130. KafkaStreams.clientsInstanceId(Duration timeout): Does it wait for all
consumer/producer/adminClient instances to be initialized? Are all those
instances created during KafkaStreams initialization?

131. Why does globalConsumerInstanceId() return Optional while
other consumer instances don't return Optional?

132. ClientMetricsSubscriptionRequestCount: Do we need this since we have a
set of generic metrics
(kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*) that
report Request rate for every request type?

Thanks,

Jun

On Wed, Oct 11, 2023 at 1:47 PM Matthias J. Sax  wrote:


Thanks!

On 10/10/23 11:31 PM, Andrew Schofield wrote:

Matthias,
Yes, I think that’s a sensible way forward and the interface you propose

looks good. I’ll update the KIP accordingly.


Thanks,
Andrew


On 10 Oct 2023, at 23:01, Matthias J. Sax  wrote:

Andrew,

yes I would like to get this change into KIP-714 right way. Seems to be

important, as we don't know if/when a follow-up KIP for Kafka Streams would
land.


I was also thinking (and discussed with a few others) how to expose it,

and we would propose the following:


We add a new method to `KafkaStreams` class:

 public ClientsInstanceIds clientsInstanceIds(Duration timeout);

The returned object is like below:

   public class ClientsInstanceIds {
 // we only have a single admin client per KS instance
 String adminInstanceId();

 // we only have a single global consumer per KS instance (if any)
 // Optional<> because we might not have global-thread
 Optional globalConsumerInstanceId();

 // return a  ClientInstanceId> mapping
 // for the underlying (restore-)consumers/producers
 Map mainConsumerInstanceIds();
 Map restoreConsumerInstanceIds();
 Map producerInstanceIds();
}

For the `threadKey`, we would use some pattern like this:

   [Stream|StateUpdater]Thread-


Would this work from your POV?



-Matthias


On 10/9/23 2:15 AM, Andrew Schofield wrote:

Hi Matth

Re: [VOTE] KIP-714: Client metrics and observability

2023-10-11 Thread Matthias J. Sax

+1 (binding)

On 9/13/23 5:48 PM, Jason Gustafson wrote:

Hey Andrew,

+1 on the KIP. For many users of Kafka, it may not be fully understood how
much of a challenge client monitoring is. With tens of clients in a
cluster, it is already difficult to coordinate metrics collection. When
there are thousands of clients, and when the cluster operator has no
control over them, it is essentially impossible. For the fat clients that
we have, the lack of useful telemetry is a huge operational gap.
Consistency between clients has also been a major challenge. I think the
effort toward standardization in this KIP will have some positive impact
even in deployments which have effective client-side monitoring. Overall, I
think this proposal will provide a lot of value across the board.

Best,
Jason

On Wed, Sep 13, 2023 at 9:50 AM Philip Nee  wrote:


Hey Andrew -

Thank you for taking the time to reply to my questions. I'm just adding
some notes to this discussion.

1. epoch: It can be helpful to know the delta of the client side and the
actual leader epoch.  It is helpful to understand why sometimes commit
fails/client not making progress.
2. Client connection: If the client selects the "wrong" connection to push
out the data, I assume the request would timeout; which should lead to
disconnecting from the node and reselecting another node as you mentioned,
via the least loaded node.

Cheers,
P


On Tue, Sep 12, 2023 at 10:40 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:


Hi Philip,
Thanks for your vote and interest in the KIP.

KIP-714 does not introduce any new client metrics, and that’s

intentional.

It does
tell how that all of the client metrics can have their names transformed
into
equivalent "telemetry metric names”, and then potentially used in metrics
subscriptions.

I am interested in the idea of client’s leader epoch in this context, but
I don’t have
an immediate plan for how best to do this, and it would take another KIP
to enhance
existing metrics or introduce some new ones. Those would then naturally

be

applicable to the metrics push introduced in KIP-714.

In a similar vein, there are no existing client metrics specifically for
auto-commit.
We could add them to Kafka, but I really think this is just an example of
asynchronous
commit in which the application has decided not to specify when the

commit

should
begin.

It is possible to increase the cadence of pushing by modifying the
interval.ms
configuration property of the CLIENT_METRICS resource.

There is an “assigned-partitions” metric for each consumer, but not one

for

active partitions. We could add one, again as a follow-on KIP.

I take your point about holding on to a connection in a channel which

might

experience congestion. Do you have a suggestion for how to improve on

this?

For example, the client does have the concept of a least-loaded node.

Maybe

this is something we should investigate in the implementation and decide
on the
best approach. In general, I think sticking with the same node for
consecutive
pushes is best, but if you choose the “wrong” node to start with, it’s

not

ideal.

Thanks,
Andrew


On 8 Sep 2023, at 19:29, Philip Nee  wrote:

Hey Andrew -

+1 but I don't have a binding vote!

It took me a while to go through the KIP. Here are some of my notes

during

the reading:

*Metrics*
- Should we care about the client's leader epoch? There is a case where

the

user recreates the topic, but the consumer thinks it is still the same
topic and therefore, attempts to start from an offset that doesn't

exist.

KIP-848 addresses this issue, but I can still see some potential

benefits

from knowing the client's epoch information.
- I assume poll idle is similar to poll interval: I needed to read the
description a few times.
- I don't have a clear use case in mind for the commit latency, but I

do

think sometimes people lack clarity about how much progress was tracked

by

the auto-commit.  Would tracking auto-commit-related metrics be

useful? I

was thinking: the last offset committed or the actual cadence in ms.
- Are there cases when we need to increase the cadence of telemetry

data

push? i.e. variable interval.
- Thanks for implementing the randomized initial metric push; I think

it

is

really important.
- Is there a potential use case for tracking the number of active
partitions? The consumer can pause partitions via API, during

revocation,

or during offset reset for the stream.

*Connections*:
- The KIP stated that it will keep the same connection until the

connection

is disconnected. I wonder if that could potentially cause congestion if

it

is already a busy channel, which leads to connection timeout and
subsequently disconnection.

Thanks,
P

On Fri, Sep 8, 2023 at 4:15 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:


Bumping the voting thread for KIP-714.

So far, we have:
Non-binding +2 (Milind and Kirk), non-binding -1 (Ryanne)

Thanks,
Andrew


On 4 Aug 2023, at 

Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-11 Thread Matthias J. Sax

Thanks!

On 10/10/23 11:31 PM, Andrew Schofield wrote:

Matthias,
Yes, I think that’s a sensible way forward and the interface you propose looks 
good. I’ll update the KIP accordingly.

Thanks,
Andrew


On 10 Oct 2023, at 23:01, Matthias J. Sax  wrote:

Andrew,

yes I would like to get this change into KIP-714 right way. Seems to be 
important, as we don't know if/when a follow-up KIP for Kafka Streams would 
land.

I was also thinking (and discussed with a few others) how to expose it, and we 
would propose the following:

We add a new method to `KafkaStreams` class:

public ClientsInstanceIds clientsInstanceIds(Duration timeout);

The returned object is like below:

  public class ClientsInstanceIds {
// we only have a single admin client per KS instance
String adminInstanceId();

// we only have a single global consumer per KS instance (if any)
// Optional<> because we might not have global-thread
Optional globalConsumerInstanceId();

// return a  ClientInstanceId> mapping
// for the underlying (restore-)consumers/producers
Map mainConsumerInstanceIds();
Map restoreConsumerInstanceIds();
Map producerInstanceIds();
}

For the `threadKey`, we would use some pattern like this:

  [Stream|StateUpdater]Thread-


Would this work from your POV?



-Matthias


On 10/9/23 2:15 AM, Andrew Schofield wrote:

Hi Matthias,
Good point. Makes sense to me.
Is this something that can also be included in the proposed Kafka Streams 
follow-on KIP, or would you prefer that I add it to KIP-714?
I have a slight preference for the former to put all of the KS enhancements 
into a separate KIP.
Thanks,
Andrew

On 7 Oct 2023, at 02:12, Matthias J. Sax  wrote:

Thanks Andrew. SGTM.

One point you did not address is the idea to add a method to `KafkaStreams` 
similar to the proposed `clientInstanceId()` that will be added to 
consumer/producer/admin clients.

Without addressing this, Kafka Streams users won't have a way to get the 
assigned `instanceId` of the internally created clients, and thus it would be 
very difficult for them to know which metrics that the broker receives belong 
to a Kafka Streams app. It seems they would only find the `instanceIds` in the 
log4j output if they enable client logging?

Of course, because there is multiple clients inside Kafka Streams, the return type cannot be an 
single "String", but must be some some complex data structure -- we could either add 
a new class, or return a Map using a client key that maps to the 
`instanceId`.

For example we could use the following key:

   [Global]StreamThread[-][-restore][consumer|producer]

(Of course, only the valid combination.)

Or maybe even better, we might want to return a `Future` because collection all 
the `instanceId` might be a blocking all on each client? I have already a few 
idea how it could be implemented but I don't think it must be discussed on the 
KIP, as it's an implementation detail.

Thoughts?


-Matthias

On 10/6/23 4:21 AM, Andrew Schofield wrote:

Hi Matthias,
Thanks for your comments. I agree that a follow-up KIP for Kafka Streams makes 
sense. This KIP currently has made a bit
of an effort to embrace KS, but it’s not enough by a long way.
I have removed `application.id <http://application.id/>`. This should be done 
properly in the follow-up KIP. I don’t believe there’s a downside to
removing it from this KIP.
I have reworded the statement about temporarily. In practice, the 
implementation of this KIP that’s going on while the voting
progresses happens to use delta temporality, but that’s an implementation 
detail. Supporting clients must support both
temporalities.
I thought about exposing the client instance ID as a metric, but non-numeric 
metrics are not usual practice and tools
do not universally support them. I don’t think the KIP is improved by adding 
one now.
I have also added constants for the various Config classes for 
ENABLE_METRICS_PUSH_CONFIG, including to
StreamsConfig. It’s best to be explicit about this.
Thanks,
Andrew

On 2 Oct 2023, at 23:47, Matthias J. Sax  wrote:

Hi,

I did not pay attention to this KIP in the past; seems it was on-hold for a 
while.

Overall it sounds very useful, and I think we should extend this with a follow 
up KIP for Kafka Streams. What is unclear to me at this point is the statement:


Kafka Streams applications have an application.id configured and this 
identifier should be included as the application_id metrics label.


The `application.id` is currently only used as the (main) consumer's `group.id` 
(and is part of an auto-generated `client.id` if the user does not set one).

This comment related to:


The following labels should be added by the client as appropriate before 
metrics are pushed.


Given that Kafka Streams uses the consumer/producer/admin client as "black 
boxes", a client does at this point not know that it's part of a Kafka Streams 
application, and thus, it won't be able to a

[jira] [Commented] (KAFKA-15571) StateRestoreListener#onRestoreSuspended is never called because wrapper DelegatingStateRestoreListener doesn't implement onRestoreSuspended

2023-10-10 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773849#comment-17773849
 ] 

Matthias J. Sax commented on KAFKA-15571:
-

Ups... Thanks to reporting and the PR!

> StateRestoreListener#onRestoreSuspended is never called because wrapper 
> DelegatingStateRestoreListener doesn't implement onRestoreSuspended
> ---
>
> Key: KAFKA-15571
> URL: https://issues.apache.org/jira/browse/KAFKA-15571
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0, 3.5.1
>Reporter: Levani Kokhreidze
>Assignee: Levani Kokhreidze
>Priority: Major
>
> With https://issues.apache.org/jira/browse/KAFKA-10575 
> `StateRestoreListener#onRestoreSuspended` was added. But local tests show 
> that it is never called because `DelegatingStateRestoreListener` was not 
> updated to call a new method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-10 Thread Matthias J. Sax

Andrew,

yes I would like to get this change into KIP-714 right way. Seems to be 
important, as we don't know if/when a follow-up KIP for Kafka Streams 
would land.


I was also thinking (and discussed with a few others) how to expose it, 
and we would propose the following:


We add a new method to `KafkaStreams` class:

public ClientsInstanceIds clientsInstanceIds(Duration timeout);

The returned object is like below:

  public class ClientsInstanceIds {
// we only have a single admin client per KS instance
String adminInstanceId();

// we only have a single global consumer per KS instance (if any)
// Optional<> because we might not have global-thread
Optional globalConsumerInstanceId();

// return a  ClientInstanceId> mapping
// for the underlying (restore-)consumers/producers
Map mainConsumerInstanceIds();
Map restoreConsumerInstanceIds();
Map producerInstanceIds();
}

For the `threadKey`, we would use some pattern like this:

  [Stream|StateUpdater]Thread-


Would this work from your POV?



-Matthias


On 10/9/23 2:15 AM, Andrew Schofield wrote:

Hi Matthias,
Good point. Makes sense to me.

Is this something that can also be included in the proposed Kafka Streams 
follow-on KIP, or would you prefer that I add it to KIP-714?
I have a slight preference for the former to put all of the KS enhancements 
into a separate KIP.

Thanks,
Andrew


On 7 Oct 2023, at 02:12, Matthias J. Sax  wrote:

Thanks Andrew. SGTM.

One point you did not address is the idea to add a method to `KafkaStreams` 
similar to the proposed `clientInstanceId()` that will be added to 
consumer/producer/admin clients.

Without addressing this, Kafka Streams users won't have a way to get the 
assigned `instanceId` of the internally created clients, and thus it would be 
very difficult for them to know which metrics that the broker receives belong 
to a Kafka Streams app. It seems they would only find the `instanceIds` in the 
log4j output if they enable client logging?

Of course, because there is multiple clients inside Kafka Streams, the return type cannot be an 
single "String", but must be some some complex data structure -- we could either add 
a new class, or return a Map using a client key that maps to the 
`instanceId`.

For example we could use the following key:

   [Global]StreamThread[-][-restore][consumer|producer]

(Of course, only the valid combination.)

Or maybe even better, we might want to return a `Future` because collection all 
the `instanceId` might be a blocking all on each client? I have already a few 
idea how it could be implemented but I don't think it must be discussed on the 
KIP, as it's an implementation detail.

Thoughts?


-Matthias

On 10/6/23 4:21 AM, Andrew Schofield wrote:

Hi Matthias,
Thanks for your comments. I agree that a follow-up KIP for Kafka Streams makes 
sense. This KIP currently has made a bit
of an effort to embrace KS, but it’s not enough by a long way.
I have removed `application.id <http://application.id/>`. This should be done 
properly in the follow-up KIP. I don’t believe there’s a downside to
removing it from this KIP.
I have reworded the statement about temporarily. In practice, the 
implementation of this KIP that’s going on while the voting
progresses happens to use delta temporality, but that’s an implementation 
detail. Supporting clients must support both
temporalities.
I thought about exposing the client instance ID as a metric, but non-numeric 
metrics are not usual practice and tools
do not universally support them. I don’t think the KIP is improved by adding 
one now.
I have also added constants for the various Config classes for 
ENABLE_METRICS_PUSH_CONFIG, including to
StreamsConfig. It’s best to be explicit about this.
Thanks,
Andrew

On 2 Oct 2023, at 23:47, Matthias J. Sax  wrote:

Hi,

I did not pay attention to this KIP in the past; seems it was on-hold for a 
while.

Overall it sounds very useful, and I think we should extend this with a follow 
up KIP for Kafka Streams. What is unclear to me at this point is the statement:


Kafka Streams applications have an application.id configured and this 
identifier should be included as the application_id metrics label.


The `application.id` is currently only used as the (main) consumer's `group.id` 
(and is part of an auto-generated `client.id` if the user does not set one).

This comment related to:


The following labels should be added by the client as appropriate before 
metrics are pushed.


Given that Kafka Streams uses the consumer/producer/admin client as "black 
boxes", a client does at this point not know that it's part of a Kafka Streams 
application, and thus, it won't be able to attach any such label to the metrics it sends. 
(Also producer and admin don't even know the value of `application.id` -- only the (main) 
consumer, indirectly via `group.id`, but also restore and global consumer don't know it, 
because they don'

Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-10-09 Thread Matthias J. Sax
Bruno and I had some background conversation about the `get` prefix 
question including a few other committers.


The official policy was never changed, and we should not add the 
`get`-prefix. It's a slip on our side in previous KIPs to add the 
`get`-prefix and we should actually clean it up doing a follow up KIP.



-Matthias


On 10/5/23 5:26 AM, Bruno Cadonna wrote:

Hi Matthias,

Given all the IQv2 KIPs that use getX and given recent PRs (internal 
interfaces mainly) that got merged, I was under the impression that we 
moved away from the strict no-getX policy.


I do not think it was an accident using getX in the IQv2 KIPs since 
somebody would have brought it up, otherwise.


I am fine with both types of getters.

If we think, we need to discuss this in a broader context, let's start a 
separate thread.



Best,
Bruno





On 10/5/23 7:44 AM, Matthias J. Sax wrote:

I agree to (almost) everything what Bruno said.


In general, we tend to move away from using getters without "get", 
recently. So I would keep the "get".


This is new to me? Can you elaborate on this point? Why do you think 
that's the case?


I actually did realize (after Walker mentioned it) that existing query 
types use `get` prefix, but to me it seems that it was by accident and 
we should consider correcting it? Thus, I would actually prefer to not 
add the `get` prefix for new methods query types.


IMHO, we should do a follow up KIP to deprecate all methods with `get` 
prefix and replace them with new ones without `get` -- it's of course 
always kinda "unnecessary" noise, but if we don't do it, we might get 
into more and more inconsistent naming what would result in a "bad" API.


If we indeed want to change the convention and use the `get` prefix, I 
would strongly advocate to bit the bullet and do KIP to pro-actively 
add the `get` "everywhere" it's missing... But overall, it seems to be 
a much broader decision, and we should get buy in from many committers 
about it -- as long as there is no broad consensus to add `get` 
everywhere, I would strongly prefer not to diverge from the current 
agreement to omit `get`.




-Matthias




On 10/4/23 2:36 AM, Bruno Cadonna wrote:

Hi,

Regarding tombstones:
As far as I understand, we need to add either a validTo field to 
VersionedRecord or we need to return tombstones, otherwise the result 
is not complete, because users could never know a record was deleted 
at some point before the second non-null value was put.
I like more adding the validTo field since it makes the result more 
concise and easier interpretable.


Extending on Victoria's example, with the following puts

put(k, v1, time=0)
put(k, null, time=5)
put(k, null, time=10)
put(k, null, time=15)
put(k, v2, time=20)

the result with tombstones would be

value, timestamp
(v1, 0)
(null, 5)
(null, 10)
(null, 15)
(v2, 20)

instead of

value, timestamp, validTo
(v1, 0, 5)
(v2, 20, null)

The benefit of conciseness would already apply to one single tombstone.

On the other hand, why would somebody write consecutive tombstones 
into a versioned state store? I guess if somebody does that on 
purpose, then there should be a way to retrieve each of those 
tombstones, right?
So maybe we need both -- validTo field and the option to return 
tombstones. The latter might be moved to a future KIP in case we see 
the need.



Regarding .within(fromTs, toTs):
I would keep it simple with .from() and .asOfTimestamp() (or 
.until()). If we go with .within(), I would opt for 
.withinTimeRange(fromTs, toTs), because the query becomes more readable:


MultiVersionedKeyQuery
   .withKey(1)
   .withinTimeRange(Instant.parse(2023-08-03T10:37:30.00Z), 
Instant.parse(2023-08-04T10:37:30.00Z))


If we stay with .from() and .until(), we should consider .fromTime() 
and .untilTime() (or .toTime()):


MultiVersionedKeyQuery
  .withKey(1)
  .fromTime(Instant.parse(2023-08-03T10:37:30.00Z))
  .untilTime(Instant.parse(2023-08-04T10:37:30.00Z))



Regarding asOf vs. until:
I think asOf() is more used in point in time queries as Walker 
mentioned where this KIP specifies a time range. IMO asOf() fits very 
well with KIP-960 where one version is queried, but here I think 
.until() fits better. That might just be a matter of taste and in the 
end I am fine with both as long as it is well documented.



Regarding getters without "get":
In the other IQv2 classes we used getters with "get". In general, we 
tend to move away from using getters without "get", recently. So I 
would keep the "get".



Best,
Bruno

On 10/3/23 7:49 PM, Walker Carlson wrote:

Hey Alieh thanks for the KIP,

Weighing in on the AsOf vs Until debate I think either is fine from a
natural language perspective. Personally AsOf makes more sense to me 
where
until gives me the idea that the query is making a change. It's 
totally a

connotative difference and not that important. I think as of is prett

Re: [VOTE] KIP-960: Support single-key_single-timestamp interactive queries (IQv2) for versioned state stores

2023-10-09 Thread Matthias J. Sax
One more nit: as discussed on the related KIP-698 thread, we should not 
use `get` as prefix for the getters.


So it should be `K key()` and `Optional asOfTimestamp()`.


Otherwise the KIP LGTM.


+1 (binding)


-Matthias

On 10/6/23 2:50 AM, Alieh Saeedi wrote:

Hi everyone,

Since KIP-960 is reduced to the simplest IQ type and all further comments
are related to the following-up KIPs, I decided to finalize it at this
point.


A huge thank you to everyone who has reviewed this KIP (and also the
following-up ones), and
participated in the discussion thread!

I'd also like to thank you in advance for taking the time to vote.

Best,
Alieh



[jira] [Updated] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest

2023-10-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15569:

Issue Type: Test  (was: Improvement)

> Update test and add test cases in IQv2StoreIntegrationTest
> --
>
> Key: KAFKA-15569
> URL: https://issues.apache.org/jira/browse/KAFKA-15569
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>
> Update test and add test cases in IQv2StoreIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest

2023-10-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15569:

Component/s: streams
 unit tests

> Update test and add test cases in IQv2StoreIntegrationTest
> --
>
> Key: KAFKA-15569
> URL: https://issues.apache.org/jira/browse/KAFKA-15569
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>
> Update test and add test cases in IQv2StoreIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15378) Rolling upgrade system tests are failing

2023-10-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772748#comment-17772748
 ] 

Matthias J. Sax commented on KAFKA-15378:
-

This is a partial fix with regard to versions... 
[https://github.com/apache/kafka/pull/14490] (cf. other PR for older branches).

There is also a bug in state updated that breaks some system tests – [~cadonna] 
opened already a PR for it: [https://github.com/apache/kafka/pull/14508]

I did identify another issue already and will open a PR after Bruno's PR was 
merged so I can get his fix upfront.

> Rolling upgrade system tests are failing
> 
>
> Key: KAFKA-15378
> URL: https://issues.apache.org/jira/browse/KAFKA-15378
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 3.5.1
>Reporter: Lucas Brutschy
>    Assignee: Matthias J. Sax
>Priority: Major
>
> The system tests are having failures for these tests:
> {noformat}
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.1.2.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.2.3.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.6.0-SNAPSHOT
> {noformat}
> See 
> [https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5801/console]
>  for logs and other test data.
> Note that system tests currently only run with [this 
> fix](https://github.com/apache/kafka/commit/24d1780061a645bb2fbeefd8b8f50123c28ca94e),
>  I think some CVE python library update broke the system tests... 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15378) Rolling upgrade system tests are failing

2023-10-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15378:
---

Assignee: Matthias J. Sax

> Rolling upgrade system tests are failing
> 
>
> Key: KAFKA-15378
> URL: https://issues.apache.org/jira/browse/KAFKA-15378
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 3.5.1
>Reporter: Lucas Brutschy
>    Assignee: Matthias J. Sax
>Priority: Major
>
> The system tests are having failures for these tests:
> {noformat}
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.1.2.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.2.3.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.6.0-SNAPSHOT
> {noformat}
> See 
> [https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5801/console]
>  for logs and other test data.
> Note that system tests currently only run with [this 
> fix](https://github.com/apache/kafka/commit/24d1780061a645bb2fbeefd8b8f50123c28ca94e),
>  I think some CVE python library update broke the system tests... 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-06 Thread Matthias J. Sax

Thanks Andrew. SGTM.

One point you did not address is the idea to add a method to 
`KafkaStreams` similar to the proposed `clientInstanceId()` that will be 
added to consumer/producer/admin clients.


Without addressing this, Kafka Streams users won't have a way to get the 
assigned `instanceId` of the internally created clients, and thus it 
would be very difficult for them to know which metrics that the broker 
receives belong to a Kafka Streams app. It seems they would only find 
the `instanceIds` in the log4j output if they enable client logging?


Of course, because there is multiple clients inside Kafka Streams, the 
return type cannot be an single "String", but must be some some complex 
data structure -- we could either add a new class, or return a 
Map using a client key that maps to the `instanceId`.


For example we could use the following key:

   [Global]StreamThread[-][-restore][consumer|producer]

(Of course, only the valid combination.)

Or maybe even better, we might want to return a `Future` because 
collection all the `instanceId` might be a blocking all on each client? 
I have already a few idea how it could be implemented but I don't think 
it must be discussed on the KIP, as it's an implementation detail.


Thoughts?


-Matthias

On 10/6/23 4:21 AM, Andrew Schofield wrote:

Hi Matthias,
Thanks for your comments. I agree that a follow-up KIP for Kafka Streams makes 
sense. This KIP currently has made a bit
of an effort to embrace KS, but it’s not enough by a long way.

I have removed `application.id <http://application.id/>`. This should be done 
properly in the follow-up KIP. I don’t believe there’s a downside to
removing it from this KIP.

I have reworded the statement about temporarily. In practice, the 
implementation of this KIP that’s going on while the voting
progresses happens to use delta temporality, but that’s an implementation 
detail. Supporting clients must support both
temporalities.

I thought about exposing the client instance ID as a metric, but non-numeric 
metrics are not usual practice and tools
do not universally support them. I don’t think the KIP is improved by adding 
one now.

I have also added constants for the various Config classes for 
ENABLE_METRICS_PUSH_CONFIG, including to
StreamsConfig. It’s best to be explicit about this.

Thanks,
Andrew


On 2 Oct 2023, at 23:47, Matthias J. Sax  wrote:

Hi,

I did not pay attention to this KIP in the past; seems it was on-hold for a 
while.

Overall it sounds very useful, and I think we should extend this with a follow 
up KIP for Kafka Streams. What is unclear to me at this point is the statement:


Kafka Streams applications have an application.id configured and this 
identifier should be included as the application_id metrics label.


The `application.id` is currently only used as the (main) consumer's `group.id` 
(and is part of an auto-generated `client.id` if the user does not set one).

This comment related to:


The following labels should be added by the client as appropriate before 
metrics are pushed.


Given that Kafka Streams uses the consumer/producer/admin client as "black 
boxes", a client does at this point not know that it's part of a Kafka Streams 
application, and thus, it won't be able to attach any such label to the metrics it sends. 
(Also producer and admin don't even know the value of `application.id` -- only the (main) 
consumer, indirectly via `group.id`, but also restore and global consumer don't know it, 
because they don't have `group.id` set).

While I am totally in favor of the proposal, I am wondering how we intent to implement it 
in clean way? Or would we do ok to have some internal client APIs that KS can use to 
"register" itself with the client?




While clients must support both temporalities, the broker will initially only 
send GetTelemetrySubscriptionsResponse.DeltaTemporality=True


Not sure if I can follow. How make the decision about DELTA or CUMULATIVE metrics? Should 
the broker side plugin not decide what metrics it what to receive in which form? So what 
does "initially" mean -- the broker won't ship with a default plugin 
implementation?




The following method is added to the Producer, Consumer, and Admin client 
interfaces:


Should we add anything to Kafka Streams to expose the underlying clients' 
assigned client-instance-ids programmatically? I am also wondering if clients 
should report their assigned client-instance-ids as metrics itself (for this 
case, Kafka Streams won't need to do anything, because we already expose all 
client metrics).

If we add anything programmatic, we need to make it simple, given that Kafka 
Streams has many clients per `StreamThread` and may have multiple threads.




enable.metrics.push

It might be worth to add this to `StreamsConfig`, too? It set via 
StreamsConfig, we would forward it to all clients automatically.




-Matthias


On 9/29/23 5:45 PM, David Jacot wrote:

Hi Andre

[jira] [Resolved] (KAFKA-15437) Add metrics about open iterators

2023-10-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15437.
-
Resolution: Duplicate

> Add metrics about open iterators
> 
>
> Key: KAFKA-15437
> URL: https://issues.apache.org/jira/browse/KAFKA-15437
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: need-kip
>
> Kafka Streams allows to create iterators over state stores. Those iterator 
> must get closed to free up resources (especially for RocksDB). – We regularly 
> get user reports of "resource leaks" that can be pinned down to leaking (ie 
> not-closed) iterators.
> To simplify monitoring, it would be helpful to add a metric about open 
> iterators to allow users to alert and pin-point the issue directly (and 
> before the actually resource leak is observed).
> We might want to have a DEBUG level per-store metric (to allow identifying 
> the store in question quickly), but an already rolled up INFO level metric 
> for the whole application.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15437) Add metrics about open iterators

2023-10-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15437.
-
Resolution: Duplicate

> Add metrics about open iterators
> 
>
> Key: KAFKA-15437
> URL: https://issues.apache.org/jira/browse/KAFKA-15437
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: need-kip
>
> Kafka Streams allows to create iterators over state stores. Those iterator 
> must get closed to free up resources (especially for RocksDB). – We regularly 
> get user reports of "resource leaks" that can be pinned down to leaking (ie 
> not-closed) iterators.
> To simplify monitoring, it would be helpful to add a metric about open 
> iterators to allow users to alert and pin-point the issue directly (and 
> before the actually resource leak is observed).
> We might want to have a DEBUG level per-store metric (to allow identifying 
> the store in question quickly), but an already rolled up INFO level metric 
> for the whole application.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-10-04 Thread Matthias J. Sax

I agree to (almost) everything what Bruno said.



In general, we tend to move away from using getters without "get", recently. So I would 
keep the "get".


This is new to me? Can you elaborate on this point? Why do you think 
that's the case?


I actually did realize (after Walker mentioned it) that existing query 
types use `get` prefix, but to me it seems that it was by accident and 
we should consider correcting it? Thus, I would actually prefer to not 
add the `get` prefix for new methods query types.


IMHO, we should do a follow up KIP to deprecate all methods with `get` 
prefix and replace them with new ones without `get` -- it's of course 
always kinda "unnecessary" noise, but if we don't do it, we might get 
into more and more inconsistent naming what would result in a "bad" API.


If we indeed want to change the convention and use the `get` prefix, I 
would strongly advocate to bit the bullet and do KIP to pro-actively add 
the `get` "everywhere" it's missing... But overall, it seems to be a 
much broader decision, and we should get buy in from many committers 
about it -- as long as there is no broad consensus to add `get` 
everywhere, I would strongly prefer not to diverge from the current 
agreement to omit `get`.




-Matthias




On 10/4/23 2:36 AM, Bruno Cadonna wrote:

Hi,

Regarding tombstones:
As far as I understand, we need to add either a validTo field to 
VersionedRecord or we need to return tombstones, otherwise the result is 
not complete, because users could never know a record was deleted at 
some point before the second non-null value was put.
I like more adding the validTo field since it makes the result more 
concise and easier interpretable.


Extending on Victoria's example, with the following puts

put(k, v1, time=0)
put(k, null, time=5)
put(k, null, time=10)
put(k, null, time=15)
put(k, v2, time=20)

the result with tombstones would be

value, timestamp
(v1, 0)
(null, 5)
(null, 10)
(null, 15)
(v2, 20)

instead of

value, timestamp, validTo
(v1, 0, 5)
(v2, 20, null)

The benefit of conciseness would already apply to one single tombstone.

On the other hand, why would somebody write consecutive tombstones into 
a versioned state store? I guess if somebody does that on purpose, then 
there should be a way to retrieve each of those tombstones, right?
So maybe we need both -- validTo field and the option to return 
tombstones. The latter might be moved to a future KIP in case we see the 
need.



Regarding .within(fromTs, toTs):
I would keep it simple with .from() and .asOfTimestamp() (or .until()). 
If we go with .within(), I would opt for .withinTimeRange(fromTs, toTs), 
because the query becomes more readable:


MultiVersionedKeyQuery
   .withKey(1)
   .withinTimeRange(Instant.parse(2023-08-03T10:37:30.00Z), 
Instant.parse(2023-08-04T10:37:30.00Z))


If we stay with .from() and .until(), we should consider .fromTime() and 
.untilTime() (or .toTime()):


MultiVersionedKeyQuery
  .withKey(1)
  .fromTime(Instant.parse(2023-08-03T10:37:30.00Z))
  .untilTime(Instant.parse(2023-08-04T10:37:30.00Z))



Regarding asOf vs. until:
I think asOf() is more used in point in time queries as Walker mentioned 
where this KIP specifies a time range. IMO asOf() fits very well with 
KIP-960 where one version is queried, but here I think .until() fits 
better. That might just be a matter of taste and in the end I am fine 
with both as long as it is well documented.



Regarding getters without "get":
In the other IQv2 classes we used getters with "get". In general, we 
tend to move away from using getters without "get", recently. So I would 
keep the "get".



Best,
Bruno

On 10/3/23 7:49 PM, Walker Carlson wrote:

Hey Alieh thanks for the KIP,

Weighing in on the AsOf vs Until debate I think either is fine from a
natural language perspective. Personally AsOf makes more sense to me 
where

until gives me the idea that the query is making a change. It's totally a
connotative difference and not that important. I think as of is pretty
frequently used in point of time queries.

Also for these methods it makes sense to drop the "get" We don't
normally use that in getters

    * The key that was specified for this query.
    */
   public K getKey();

   /**
    * The starting time point of the query, if specified
    */
   public Optional getFromTimestamp();

   /**
    * The ending time point of the query, if specified
    */
   public Optional getAsOfTimestamp();

Other than that I didn't have too much to add. Overall I like the 
direction

of the KIP and think the funcatinlyt is all there!
best,
Walker



On Mon, Oct 2, 2023 at 10:46 PM Matthias J. Sax  wrote:


Thanks for the updated KIP. Overall I like it.

Victoria raises a very good point, and I personally tend to prefer (I
believe so does Victoria, but it's not totally clear from her email) if
a range query would not return any tombsto

Re: [DISCUSS] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-04 Thread Matthias J. Sax
require opening a rocksdb range scan** on multiple rocksdb

instances (one

per partition), and polling the first key of each. Whether or

not this is

ordered, could we please add that to the documentation?

**(How is this implemented/guaranteed in an

`inMemoryKeyValueStore`? I

don't know about that implementation).

Colt McNealy

*Founder, LittleHorse.dev*


On Tue, Oct 3, 2023 at 1:35 PM Hanyu (Peter) Zheng
 wrote:


ok, I will update it. Thank you  Matthias

Sincerely,
Hanyu

On Tue, Oct 3, 2023 at 11:23 AM Matthias J. Sax <

mj...@apache.org>

wrote:



Thanks for the KIP Hanyu!


I took a quick look and it think the proposal makes sense

overall.


A few comments about how to structure the KIP.

As you propose to not add `ReverseRangQuery` class, the

code

example

should go into "Rejected Alternatives" section, not in the

"Proposed

Changes" section.

For the `RangeQuery` code example, please omit all existing

methods

etc,

and only include what will be added/changed. This make it

simpler to

read the KIP.


nit: typo


  the fault value is false


Should be "the default value is false".


Not sure if `setReverse()` is the best name. Maybe

`withDescandingOrder`

(or similar, I guess `withReverseOrder` would also work)

might be

better? Would be good to align to KIP-969 proposal that

suggest do use

`withDescendingKeys` methods for "reverse key-range"; if we

go with

`withReverseOrder` we should change KIP-969 accordingly.

Curious to hear what others think about naming this

consistently across

both KIPs.


-Matthias


On 10/3/23 9:17 AM, Hanyu (Peter) Zheng wrote:











https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2







--

[image: Confluent] <https://www.confluent.io>
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<






https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:

Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
<https://www.linkedin.com/in/hanyu-peter-zheng/>[image:

Slack]

<https://slackpass.io/confluentcommunity>[image: YouTube]
<https://youtube.com/confluent>

[image: Try Confluent Cloud for Free]
<






https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic











--

[image: Confluent] <https://www.confluent.io>
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<

https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:

Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
<https://www.linkedin.com/in/hanyu-peter-zheng/>[image: Slack]
<https://slackpass.io/confluentcommunity>[image: YouTube]
<https://youtube.com/confluent>

[image: Try Confluent Cloud for Free]
<

https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic







--

[image: Confluent] <https://www.confluent.io>
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<

https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:

Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
<https://www.linkedin.com/in/hanyu-peter-zheng/>[image: Slack]
<https://slackpass.io/confluentcommunity>[image: YouTube]
<https://youtube.com/confluent>

[image: Try Confluent Cloud for Free]
<

https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic







--

[image: Confluent] <https://www.confluent.io>
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<

https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:

Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
<https://www.linkedin.com/in/hanyu-peter-zheng/>[image: Slack]
<https://slackpass.io/confluentcommunity>[image: YouTube]
<https://youtube.com/confluent>

[image: Try Confluent Cloud for Free]
<

https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic







--

[image: Confluent] <https://www.confluent.io>
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<

https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:

Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]

[jira] [Commented] (KAFKA-15541) RocksDB Iterator Metrics

2023-10-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772072#comment-17772072
 ] 

Matthias J. Sax commented on KAFKA-15541:
-

Did not read the KIP yet, but sound very similar to 
https://issues.apache.org/jira/browse/KAFKA-15437 – should we close K15437 as 
duplicate?

> RocksDB Iterator Metrics
> 
>
> Key: KAFKA-15541
> URL: https://issues.apache.org/jira/browse/KAFKA-15541
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Major
>  Labels: kip, kip-required
>
> [KIP-989: RocksDB Iterator 
> Metrics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics]
> RocksDB {{Iterators}} must be closed after use, to prevent memory leaks due 
> to [blocks being "pinned" 
> in-memory|https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#blocks-pinned-by-iterators].
>  Pinned blocks can currently be tracked via the per-store 
> {{block-cache-pinned-usage}} metric. However, it's common [(and even 
> recommended)|https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb]
>  to share the Block Cache among all stores in an application, to enable users 
> to globally bound native memory used by RocksDB. This results in the 
> {{block-cache-pinned-usage}} reporting the same memory usage for every store 
> in the application, irrespective of which store is actually pinning blocks in 
> the block cache.
> To aid users in finding leaked Iterators, as well as identifying the cause of 
> a high number of pinned blocks, we introduce two new metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-03 Thread Matthias J. Sax

Thanks for the KIP Hanyu!


I took a quick look and it think the proposal makes sense overall.

A few comments about how to structure the KIP.

As you propose to not add `ReverseRangQuery` class, the code example 
should go into "Rejected Alternatives" section, not in the "Proposed 
Changes" section.


For the `RangeQuery` code example, please omit all existing methods etc, 
and only include what will be added/changed. This make it simpler to 
read the KIP.



nit: typo


 the fault value is false


Should be "the default value is false".


Not sure if `setReverse()` is the best name. Maybe `withDescandingOrder` 
(or similar, I guess `withReverseOrder` would also work) might be 
better? Would be good to align to KIP-969 proposal that suggest do use 
`withDescendingKeys` methods for "reverse key-range"; if we go with 
`withReverseOrder` we should change KIP-969 accordingly.


Curious to hear what others think about naming this consistently across 
both KIPs.



-Matthias


On 10/3/23 9:17 AM, Hanyu (Peter) Zheng wrote:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2



Re: [DISCUSS] KIP-969: Support range interactive queries for versioned state stores

2023-10-02 Thread Matthias J. Sax

Thanks for updating the KIP.

Not sure if I agree or not with Bruno's idea to split the query types 
further? In the end, we split them only because there is three different 
return types: single value, value-iterator, key-value-iterator.


What do we gain by splitting out single-ts-range-key? In the end, for 
range-ts-range-key the proposed class is necessary and is a superset 
(one can set both timestamps to the same value, for single-ts lookup).


The mentioned simplification might apply to "single-ts-range-key" but I 
don't see a simplification for the proposed (and necessary) query type?


On the other hand, I see an advantage of a single-ts-range-key for 
querying over the "latest version" with a range of keys. For a 
single-ts-range-key query, this it would be the default (similar to 
VersionedKeyQuery with not asOf-timestamped defined).


In the current version of the KIP, (if we agree that default should 
actually return "all versions" not "latest" -- this default was 
suggested by Bruno on KIP-968 and makes sense to me, so we would need to 
have the same default here to stay consistent), users would need to pass 
in `from(Long.MAX).to(Long.MAX)` (if I got this right) to query the 
latest point in time only, what seems to be clumsy? Or we could add a 
`lastestKeyOnly` option to `MultiVersionedRangeQuery`, but it does seems 
a little clumsy, too.





The overall order of the returned records is by Key


I assume, results are returned by timestamp for each key. The KIP should 
be explicit about it.




To be very explicit, should we rename the methods to specify the key bound?

 - withRange -> withKeyRange
 - withLowerBound -> withLowerKeyBound
 - withUpperBound -> withUpperKeyBound
 - withNoBounds -> allKeys (or withNoKeyBounds, but we use 
`allVersions` and not `noTimeBound` and should align the naming?)




-Matthias


On 9/6/23 5:25 AM, Bruno Cadonna wrote:

Hi Alieh,

Thanks for the KIP!

One high level comment/question:

I assume you separated single key queries into two classes because 
versioned key queries return a single value and multi version key 
queries return iterators. Although, range queries always return 
iterators, it would make sense to also separate range queries for 
versioned state stores into range queries that return one single version 
of the keys within a range and range queries that return multiple 
version of the keys within a range, IMO. That would reduce the 
meaningless combinations.

WDYT?

Best,
Bruno

On 8/16/23 8:01 PM, Alieh Saeedi wrote:

Hi all,

I splitted KIP-960

into
three separate KIPs. Therefore, please continue discussions about range
interactive queries here. You can see all the addressed reviews on the
following page. Thanks in advance.

KIP-969: Support range interactive queries (IQv2) for versioned state 
stores



I look forward to your feedback!

Cheers,
Alieh



Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-10-02 Thread Matthias J. Sax

Thanks for the updated KIP. Overall I like it.

Victoria raises a very good point, and I personally tend to prefer (I 
believe so does Victoria, but it's not totally clear from her email) if 
a range query would not return any tombstones, ie, only two records in 
Victoria's example. Thus, it seems best to include a `validTo` ts-field 
to `VersionedRecord` -- otherwise, the retrieved result cannot be 
interpreted correctly.


Not sure what others think about it.

I would also be open to actually add a `includeDeletes()` (or 
`includeTombstones()`) method/flag (disabled by default) to allow users 
to get all tombstone: this would only be helpful if there are two 
consecutive tombstone though (if I got it right), so not sure if we want 
to add it or not -- it seems also possible to add it later if there is 
user demand for it, so it might be a premature addition as this point?



Nit:

the public interface ValueIterator is used 


"is used" -> "is added" (otherwise it sounds like as if `ValueIterator` 
exist already)




Should we also add a `.within(fromTs, toTs)` (or maybe some better 
name?) to allow specifying both bounds at once? The existing 
`RangeQuery` does the same for specifying the key-range, so might be 
good to add for time-range too?




-Matthias


On 9/6/23 5:01 AM, Bruno Cadonna wrote:

In my last e-mail I missed to finish a sentence.

"I think from a KIP"

should be

"I think the KIP looks good!"


On 9/6/23 1:59 PM, Bruno Cadonna wrote:

Hi Alieh,

Thanks for the KIP!

I think from a KIP

1.
I propose to throw an IllegalArgumentException or an 
IllegalStateException for meaningless combinations. In any case, the 
KIP should specify what exception is thrown.


2.
Why does not specifying a range return the latest version? I would 
expect that it returns all versions since an empty lower or upper 
limit is interpreted as no limit.


3.
I second Matthias comment about replacing "asOf" with "until" or "to".

4.
Do we need "allVersions()"? As I said above I would return all 
versions if no limits are specified. I think if we get rid of 
allVersions() there might not be any meaningless combinations anymore.
If a user applies twice the same limit like for example 
MultiVersionedKeyQuery.with(key).from(t1).from(t2) the last one wins.


5.
Could you add some more examples with time ranges to the example section?

6.
The KIP misses the test plan section.

7.
I propose to rename the class to "MultiVersionKeyQuery" since we are 
querying multiple versions of the same key.


8.
Could you also add withAscendingTimestamps()? IMO it gives users the 
possibility to make their code more readable instead of only relying 
on the default.


Best,
Bruno


On 8/17/23 4:13 AM, Matthias J. Sax wrote:

Thanks for splitting this part into a separate KIP!

For `withKey()` we should be explicit that `null` is not allowed.

(Looking into existing `KeyQuery` it seems the JavaDocs don't cover 
this either -- would you like to do a tiny cleanup PR for this, or 
fix on-the-side in one of your PRs?)




The key query returns all the records that are valid in the time 
range starting from the timestamp {@code fromTimestamp}.


In the JavaDocs you use the phrase `are valid` -- I think we need to 
explain what "valid" means? It might even be worth to add some 
examples. It's annoying, but being precise if kinda important.


With regard to KIP-962, should we allow `null` for time bounds ? The 
JavaDocs should also be explicit if `null` is allowed or not and what 
the semantics are if allowed.




You are using `asOf()` however, because we are doing time-range 
queries, to me using `until()` to describe the upper bound would 
sound better (I am not a native speaker though, so maybe I am off?)



The key query returns all the records that have timestamp <= {@code 
asOfTimestamp}.


This is only correct if not lower-bound is set, right?


In your reply to KIP-960 you mentioned:


the meaningless combinations are prevented by throwing exceptions.


We should add corresponding JavaDocs like:

    @throws IllegalArgumentException if {@code fromTimestamp} is 
equal or

 larger than {@code untilTimestamp}

Or something similar.


With regard to KIP-960: if we need to introduce a `VersionedKeyQuery` 
class for single-key-single-ts lookup, would we need to find a new 
name for the query class of this KIP, given that the return type is 
different?



-Matthias



On 8/16/23 10:57 AM, Alieh Saeedi wrote:

Hi all,

I splitted KIP-960
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores>
into three separate KIPs. Therefore, please continue discussions
about single-key, multi-timestamp interactive queries here. You can 
see all

the addressed reviews on the following page. Thanks in adva

Re: Kafka Streams reaching ERROR state during rolling upgrade / restart of brokers

2023-10-02 Thread Matthias J. Sax
I did mean client side...  If KS goes into ERROR state, it should log 
the reason.


If the logs are indeed empty, try to register an 
uncaught-exception-handler via


KafkaStreamssetUncaughtExceptionHandler(...)


-Matthias

On 10/2/23 12:11 PM, Debraj Manna wrote:

Are you suggesting to check the Kafka broker logs? I do not see any other
errors logs on the client / application side.

On Fri, 29 Sep, 2023, 22:01 Matthias J. Sax,  wrote:


In general, Kafka Streams should keep running.

Can you inspect the logs to figure out why it's going into ERROR state
to begin with? Maybe you need to increase/change some timeouts/retries
configs.

The stack trace you shared, is a symptom, but not the root cause.

-Matthias

On 9/21/23 12:56 AM, Debraj Manna wrote:

I am using Kafka broker 2.8.1 (from AWS MSK) with Kafka clients and Kafka
stream 3.5.1.

I am observing that whenever some rolling upgrade is done on AWS MSK our
stream application reaches an error state. I get the below exception on
trying to query the state store

caused by: java.lang.IllegalStateException: KafkaStreams is not running.
State is ERROR.
  at


org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381)

  at


org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663)

  at


org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227)


Can someone let me know what the recommended way we can keep the stream
application running whenever some rolling upgrade/restart of brokers is
done in the background?







Re: [DISCUSS] KIP-960: Support interactive queries (IQv2) for versioned state stores

2023-10-02 Thread Matthias J. Sax
ted.

About defining new methods in the VersionedKeyValueStore interface: I

actually have defined the required methods in the RocksDBVersionedStore
class. Since defining them for the interface requires implementing them

for

all the classes that have implemented the interface.
Again a discussion for your other KIPs, but I think you'll want to

define

the new method(s) in the VersionedKeyValueStore interface directly

(rather

than only in individual implementations such as RocksDBVersionedStore),
otherwise your new interactive query types will throw NPEs for custom

store

implementations which do not support the new methods.
Best,VictoriaOn Thursday, August 17, 2023 at 07:25:22 AM EDT, Alieh
Saeedi  wrote:

   Hey Matthias,
thanks for the feedback

I think if one materializes a versioned store, then the query is posed

to

the versioned state store. So the type of materialized store determines

the

type of store and consequently all the classes for running the query

(for

example, MeteredVersionedKeyValueStore instead of MeteredKeyValueStore

and

so on). I added the piece of code for defining the versioned state

store to

the example part of the KIP-960.

About the generics, using VersionedRecord instead of V worked. Right
now, I am composing the integration tests. Let me complete the code and
confirm it for 100%.

About the KeyQuery class, I thought the KIP must contain just the newly
added stuff. OK, I will paste the whole class in KIP-960.

Thanks,
Alieh




On Thu, Aug 17, 2023 at 3:54 AM Matthias J. Sax 

wrote:



Thanks for updating the KIP and splitting into multiple ones. I am just
going to reply for the single-key-single-timestamp case below.

It seems the `KeyQuery.java` code snipped is "incomplete" -- the class
definition is missing.

At the same time, the example uses `VersionedKeyQuery` so I am not sure
right now if you propose to re-use the existing `KeyQuery` class or
introduce a new `VersionedKeyQuery` class?

While it was suggested that we re-use the existing `KeyQuery` class, I
am wondering what would happen if one uses the new `asOf` method, and
passes the query into a non-versioned store?

In the end, a non-versioned store does not know that there is an as-of
timestamp set and thus might just do a plain lookup (it also only has a
single value per key) and return whatever value it has stored?

I am wondering if this would be semantically questionable and/or
confusing for users (especially for timestamped stores)? -- Because the
non-versioned store does not know anything about the timestamp, it can
also not even check if it's set and raise an error.


Did you try to prototype any of both approaches? Asking because I am
wondering about generics and return types? Existing `KeyQuery` is

defined

as

`KeyQuery extends Query` so `V` is the result type.

However for the versioned-store we want the result type to be
`VersionedRecord` and thus we would need to set `V =
VersionedRecord` -- would this work or would the compiler tip over

it

(or would it work but still be confusing/complex for users to specify
the right types)?

For `VersionedKeyQuery` we could do:

`VersionedKeyQuery extends Query>`

what seems cleaner?

Without writing code I always have a hard time to reason about

generics,

so maybe trying out both approaches might shed some light?




-Matthias


On 8/15/23 9:03 AM, Alieh Saeedi wrote:

Hi all,
thanks to all for the great points you mentioned.

Addressed reviews are listed as follows:
1. The methods are defined as composable, as Lucas suggested. Now we

have

even more types of single-key_multi-timestamp queries. As Matthias
suggested in his first review, now with composable methods, queries

with

a

lower time bound are also possible. The meaningless combinations are
prevented by throwing exceptions.
2. I corrected and replaced asOf everywhere instead of until. I hope

the

javadocs and the explanations in the KIPs are clear enough about the

time

range. Matthias, Lucas, and Victoria asked about the exact time

boundaries.

I assumed that if the time range is specified as [t1, t2], all the

records

that have been inserted within this time range must be returned by the
query. But I think the point that all of you referred to and that

Victoria

clarified very well is valid. Maybe the query must return "all the
records that are valid within the time range". Therefore, records that

have

been inserted before t1 are also retuned. Now, this makes more sense

to

me

as a user. By the way, it seems more like a product question.
3. About the order of retuned records, I added some boolean fields to

the

classes to specify them. I still do not have any clue how hard the
implementation of this will be. The question is, is the order

considered

for normal range queries as well?
4. As Victoria pointed out the issue about listing tombstones, I

changed

the VersionedRecord such that it can have NULL values as well. The

question

is, what was 

Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-02 Thread Matthias J. Sax

Hi,

I did not pay attention to this KIP in the past; seems it was on-hold 
for a while.


Overall it sounds very useful, and I think we should extend this with a 
follow up KIP for Kafka Streams. What is unclear to me at this point is 
the statement:



Kafka Streams applications have an application.id configured and this 
identifier should be included as the application_id metrics label.


The `application.id` is currently only used as the (main) consumer's 
`group.id` (and is part of an auto-generated `client.id` if the user 
does not set one).


This comment related to:


The following labels should be added by the client as appropriate before 
metrics are pushed.


Given that Kafka Streams uses the consumer/producer/admin client as 
"black boxes", a client does at this point not know that it's part of a 
Kafka Streams application, and thus, it won't be able to attach any such 
label to the metrics it sends. (Also producer and admin don't even know 
the value of `application.id` -- only the (main) consumer, indirectly 
via `group.id`, but also restore and global consumer don't know it, 
because they don't have `group.id` set).


While I am totally in favor of the proposal, I am wondering how we 
intent to implement it in clean way? Or would we do ok to have some 
internal client APIs that KS can use to "register" itself with the client?





While clients must support both temporalities, the broker will initially only 
send GetTelemetrySubscriptionsResponse.DeltaTemporality=True


Not sure if I can follow. How make the decision about DELTA or 
CUMULATIVE metrics? Should the broker side plugin not decide what 
metrics it what to receive in which form? So what does "initially" mean 
-- the broker won't ship with a default plugin implementation?





The following method is added to the Producer, Consumer, and Admin client 
interfaces:


Should we add anything to Kafka Streams to expose the underlying 
clients' assigned client-instance-ids programmatically? I am also 
wondering if clients should report their assigned client-instance-ids as 
metrics itself (for this case, Kafka Streams won't need to do anything, 
because we already expose all client metrics).


If we add anything programmatic, we need to make it simple, given that 
Kafka Streams has many clients per `StreamThread` and may have multiple 
threads.





enable.metrics.push
It might be worth to add this to `StreamsConfig`, too? It set via 
StreamsConfig, we would forward it to all clients automatically.





-Matthias


On 9/29/23 5:45 PM, David Jacot wrote:

Hi Andrew,

Thanks for driving this one. I haven't read all the KIP yet but I already
have an initial question. In the Threading section, it is written
"KafkaConsumer: the "background" thread (based on the consumer threading
refactor which is underway)". If I understand this correctly, it means
that KIP-714 won't work if the "old consumer" is used. Am I correct?

Cheers,
David


On Fri, Sep 22, 2023 at 12:18 PM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:


Hi Philip,
No, I do not think it should actively search for a broker that supports
the new
RPCs. In general, either all of the brokers or none of the brokers will
support it.
In the window, where the cluster is being upgraded or client telemetry is
being
enabled, there might be a mixed situation. I wouldn’t put too much effort
into
this mixed scenario. As the client finds brokers which support the new
RPCs,
it can begin to follow the KIP-714 mechanism.

Thanks,
Andrew


On 22 Sep 2023, at 20:01, Philip Nee  wrote:

Hi Andrew -

Question on top of your answers: Do you think the client should actively
search for a broker that supports this RPC? As previously mentioned, the
broker uses the leastLoadedNode to find its first connection (am
I correct?), and what if that broker doesn't support the metric push?

P

On Fri, Sep 22, 2023 at 10:20 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:


Hi Kirk,
Thanks for your question. You are correct that the presence or absence

of

the new RPCs in the
ApiVersionsResponse tells the client whether to request the telemetry
subscriptions and push
metrics.

This is of course tricky in practice. It would be conceivable, as a
cluster is upgraded to AK 3.7
or as a client metrics receiver plugin is deployed across the cluster,
that a client connects to some
brokers that support the new RPCs and some that do not.

Here’s my suggestion:
* If a client is not connected to any brokers that support in the new
RPCs, it cannot push metrics.
* If a client is only connected to brokers that support the new RPCs, it
will use the new RPCs in
accordance with the KIP.
* If a client is connected to some brokers that support the new RPCs and
some that do not, it will
use the new RPCs with the supporting subset of brokers in accordance

with

the KIP.

Comments?

Thanks,
Andrew


On 22 Sep 2023, at 16:01, Kirk True  wrote:

Hi Andrew/Jun,

I want to make sure I understand question/comment 

[jira] [Updated] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15527:

Labels: kip  (was: )

> Add reverseRange and reverseAll query over kv-store in IQv2
> ---
>
> Key: KAFKA-15527
> URL: https://issues.apache.org/jira/browse/KAFKA-15527
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
>
> Add reverseRange and reverseAll query over kv-store in IQv2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15527:

Component/s: streams

> Add reverseRange and reverseAll query over kv-store in IQv2
> ---
>
> Key: KAFKA-15527
> URL: https://issues.apache.org/jira/browse/KAFKA-15527
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>
> Add reverseRange and reverseAll query over kv-store in IQv2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15520) Kafka Streams Stateful Aggregation Rebalancing causing processing to pause on all partitions

2023-09-29 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770539#comment-17770539
 ] 

Matthias J. Sax commented on KAFKA-15520:
-

{quote}However, I'm seeing that when pods restart - it triggers rebalances and 
causes processing to be paused on all pods till the rebalance and state restore 
is in progress.a
{quote}
and
{quote}I have increased session timeout to 480 seconds
{quote}
How long does a pod restart take? It is quicker than 480 seconds? – If yes, no 
rebalance should be triggered. (`max.poll.interval.ms` config would not make a 
difference for this case)
{quote}My understanding is that even if there is a rebalance - only the 
partitions that should be moved around will be restored in a cooperative way 
and not pause all the processing.
{quote}
That's right, however, in KS if a thread gets a new task assigned for which 
state must be restored, KS expliclity pauses processing for all other task to 
put all work into restoring to reduce restore latency (in some version of KS we 
tried to interleave processing of active task plus restoring, but there was 
complaints that it slows down restoring too much – with the new "state updated 
thread" we are adding, we aim to allow processing and restoring to happen in 
parallel again in future versions – maybe 3.7 if we can get it over the finish 
line).
{quote}Also, it should failover to standby replica in this case and avoid state 
restoring on other pods.
{quote}
Yes. Not sure why this does not happen, but you are using a somewhat older 
version of Kafka Streams – we put a lot of work into fixing bugs to this end, 
so it would be best to upgrade to 3.5 to see if the issues are already fiexed.

> Kafka Streams Stateful Aggregation Rebalancing causing processing to pause on 
> all partitions
> 
>
> Key: KAFKA-15520
> URL: https://issues.apache.org/jira/browse/KAFKA-15520
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.2
>Reporter: Rohit Bobade
>Priority: Major
>
> Kafka broker version: 2.8.0 Kafka Streams client version: 2.6.2
> I am running kafka streams stateful aggregations on K8s statefulset with 
> persistent volume attached to each pod. I have also specified
> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, podName);
> which makes sure it gets the sticky partition assignment.
> Enabled standby replica - 
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> and set props.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "0");
> However, I'm seeing that when pods restart - it triggers rebalances and 
> causes processing to be paused on all pods till the rebalance and state 
> restore is in progress.
> My understanding is that even if there is a rebalance - only the partitions 
> that should be moved around will be restored in a cooperative way and not 
> pause all the processing. Also, it should failover to standby replica in this 
> case and avoid state restoring on other pods.
> I have increased session timeout to 480 seconds and max poll interval to 15 
> mins to minimize rebalances.
> Also added
> props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> CooperativeStickyAssignor.class.getName());
> to enable CooperativeStickyAssignor
> could someone please help if I'm missing something?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15491) RackId doesn't exist error while running WordCountDemo

2023-09-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15491.
-
Fix Version/s: 3.6.1
   3.7.0
 Assignee: Hao Li
   Resolution: Fixed

> RackId doesn't exist error while running WordCountDemo
> --
>
> Key: KAFKA-15491
> URL: https://issues.apache.org/jira/browse/KAFKA-15491
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Luke Chen
>Assignee: Hao Li
>Priority: Major
> Fix For: 3.6.1, 3.7.0
>
>
> While running the WordCountDemo following the 
> [docs|https://kafka.apache.org/documentation/streams/quickstart], I saw the 
> following error logs in the stream application output. Though everything 
> still works fine, it'd be better there are no ERROR logs in the demo app.
> {code:java}
> [2023-09-24 14:15:11,723] ERROR RackId doesn't exist for process 
> e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
> streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
>  
> (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
> [2023-09-24 14:15:11,757] ERROR RackId doesn't exist for process 
> e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
> streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
>  
> (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15491) RackId doesn't exist error while running WordCountDemo

2023-09-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15491.
-
Fix Version/s: 3.6.1
   3.7.0
 Assignee: Hao Li
   Resolution: Fixed

> RackId doesn't exist error while running WordCountDemo
> --
>
> Key: KAFKA-15491
> URL: https://issues.apache.org/jira/browse/KAFKA-15491
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Luke Chen
>Assignee: Hao Li
>Priority: Major
> Fix For: 3.6.1, 3.7.0
>
>
> While running the WordCountDemo following the 
> [docs|https://kafka.apache.org/documentation/streams/quickstart], I saw the 
> following error logs in the stream application output. Though everything 
> still works fine, it'd be better there are no ERROR logs in the demo app.
> {code:java}
> [2023-09-24 14:15:11,723] ERROR RackId doesn't exist for process 
> e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
> streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
>  
> (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
> [2023-09-24 14:15:11,757] ERROR RackId doesn't exist for process 
> e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
> streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
>  
> (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New Kafka PMC Member: Justine Olshan

2023-09-29 Thread Matthias J. Sax

Congrats!

On 9/25/23 7:29 AM, Rajini Sivaram wrote:

Congratulations, Justine!

Regards,

Rajini

On Mon, Sep 25, 2023 at 9:40 AM Lucas Brutschy
 wrote:


Congrats, Justine!

On Mon, Sep 25, 2023 at 9:20 AM Bruno Cadonna  wrote:


Congrats, Justine! Well deserved!

Best,
Bruno

On 9/25/23 5:28 AM, ziming deng wrote:

Congratulations Justine!



On Sep 25, 2023, at 00:01, Viktor Somogyi-Vass <

viktor.somo...@cloudera.com.INVALID> wrote:


Congrats Justine!

On Sun, Sep 24, 2023, 17:45 Kirk True  wrote:


Congratulations Justine! Thanks for all your great work!


On Sep 24, 2023, at 8:37 AM, John Roesler 

wrote:


Congratulations, Justine!
-John

On Sun, Sep 24, 2023, at 05:05, Mickael Maison wrote:

Congratulations Justine!

On Sun, Sep 24, 2023 at 5:04 AM Sophie Blee-Goldman
 wrote:


Congrats Justine!

On Sat, Sep 23, 2023, 4:36 PM Tom Bentley 

wrote:



Congratulations!

On Sun, 24 Sept 2023 at 12:32, Satish Duggana <

satish.dugg...@gmail.com>

wrote:


Congratulations Justine!!

On Sat, 23 Sept 2023 at 15:46, Bill Bejeck 

wrote:


Congrats Justine!

-Bill

On Sat, Sep 23, 2023 at 6:23 PM Greg Harris



wrote:


Congratulations Justine!

On Sat, Sep 23, 2023 at 5:49 AM Boudjelda Mohamed Said
 wrote:


Congrats Justin !

On Sat 23 Sep 2023 at 14:44, Randall Hauch 


wrote:



Congratulations, Justine!

On Sat, Sep 23, 2023 at 4:25 AM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:


Congrats Justine!

On Sat, Sep 23, 2023, 13:28 Divij Vaidya <

divijvaidy...@gmail.com>

wrote:



Congratulations Justine!

On Sat 23. Sep 2023 at 07:06, Chris Egerton <

fearthecel...@gmail.com>

wrote:


Congrats Justine!
On Fri, Sep 22, 2023, 20:47 Guozhang Wang <

guozhang.wang...@gmail.com>

wrote:


Congratulations!

On Fri, Sep 22, 2023 at 8:44 PM Tzu-Li (Gordon) Tai <

tzuli...@apache.org


wrote:


Congratulations Justine!

On Fri, Sep 22, 2023, 19:25 Philip Nee <

philip...@gmail.com>

wrote:



Congrats Justine!

On Fri, Sep 22, 2023 at 7:07 PM Luke Chen <

show...@gmail.com>

wrote:



Hi, Everyone,

Justine Olshan has been a Kafka committer since

Dec.

2022.

She

has

been

very active and instrumental to the community since

becoming

a

committer.

It's my pleasure to announce that Justine is now a

member of

Kafka

PMC.


Congratulations Justine!

Luke
on behalf of Apache Kafka PMC































Re: [ANNOUNCE] New committer: Yash Mayya

2023-09-29 Thread Matthias J. Sax

Congrats!

On 9/26/23 1:30 AM, Mayank Shekhar Narula wrote:

Congrats Yash!

On Fri, Sep 22, 2023 at 5:24 PM Mickael Maison 
wrote:


Congratulations Yash!

On Fri, Sep 22, 2023 at 9:25 AM Chaitanya Mukka
 wrote:


Congrats, Yash!! Well deserved.

Chaitanya Mukka
On 21 Sep 2023 at 8:58 PM +0530, Bruno Cadonna ,

wrote:

Hi all,

The PMC of Apache Kafka is pleased to announce a new Kafka committer
Yash Mayya.

Yash's major contributions are around Connect.

Yash authored the following KIPs:

KIP-793: Allow sink connectors to be used with topic-mutating SMTs
KIP-882: Kafka Connect REST API configuration validation timeout
improvements
KIP-970: Deprecate and remove Connect's redundant task configurations
endpoint
KIP-980: Allow creating connectors in a stopped state

Overall, Yash is known for insightful and friendly input to discussions
and his high quality contributions.

Congratulations, Yash!

Thanks,

Bruno (on behalf of the Apache Kafka PMC)







Re: [ANNOUNCE] New committer: Lucas Brutschy

2023-09-29 Thread Matthias J. Sax

Congrats!

On 9/26/23 1:29 AM, Mayank Shekhar Narula wrote:

Congratulations Lucas!

On Fri, Sep 22, 2023 at 5:24 PM Mickael Maison 
wrote:


Congratulations Lucas!

On Fri, Sep 22, 2023 at 7:13 AM Luke Chen  wrote:


Congratulations, Lukas!

Luke

On Fri, Sep 22, 2023 at 6:53 AM Tom Bentley  wrote:


Congratulations!

On Fri, 22 Sept 2023 at 09:11, Sophie Blee-Goldman <

ableegold...@gmail.com



wrote:


Congrats Lucas!










Re: Kafka Streams reaching ERROR state during rolling upgrade / restart of brokers

2023-09-29 Thread Matthias J. Sax

In general, Kafka Streams should keep running.

Can you inspect the logs to figure out why it's going into ERROR state 
to begin with? Maybe you need to increase/change some timeouts/retries 
configs.


The stack trace you shared, is a symptom, but not the root cause.

-Matthias

On 9/21/23 12:56 AM, Debraj Manna wrote:

I am using Kafka broker 2.8.1 (from AWS MSK) with Kafka clients and Kafka
stream 3.5.1.

I am observing that whenever some rolling upgrade is done on AWS MSK our
stream application reaches an error state. I get the below exception on
trying to query the state store

caused by: java.lang.IllegalStateException: KafkaStreams is not running.
State is ERROR.
 at
org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381)
 at
org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663)
 at
org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227)

Can someone let me know what the recommended way we can keep the stream
application running whenever some rolling upgrade/restart of brokers is
done in the background?



Re: Can a message avoid loss occur in Kafka

2023-09-29 Thread Matthias J. Sax
For the config you provide, data loss should not happen (as long as you 
don't allow for unclean leader election, which is disabled by default).


But you might be subject to unavailability for some partitions if a 
broker fails.



-Matthias

On 9/17/23 7:49 AM, 陈近南 wrote:

Hello,
Can a message avoid loss occur in Kafka. For example, my config is:


Producer
retries = Integer.MAX_VALUE
request.required.acks=-1


Broker
replication.factor >= 2
min.insync.replicas > 1
log.flush.interval.messages=1


Consumer
enable.auto.commit = false

  Can it avoid loss message occur in Kafka, if can not,  why? and does exist other MQ can do avoid?



Best regards,
Chen



[jira] [Updated] (KAFKA-15463) StreamsException: Accessing from an unknown node

2023-09-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15463:

Priority: Major  (was: Blocker)

>  StreamsException: Accessing from an unknown node
> -
>
> Key: KAFKA-15463
> URL: https://issues.apache.org/jira/browse/KAFKA-15463
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.1
>Reporter: Yevgeny
>Priority: Major
>
> After some time application was working fine, starting to get:
>  
> This is springboot application runs in kubernetes as stateful pod.
>  
>  
>  
> {code:java}
>   Exception in thread 
> "-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown 
> node at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162)
>  at myclass1.java:28) at myclass2.java:48) at 
> java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at 
> java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
>  at 
> java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637)
>  at myclass3.java:48) at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
>  at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)
>    {code}
>  
> stream-thread 
> [-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State 
> transition from PENDING_SHUTDOWN to DEAD
>  
>  
> Transformer is Prototype bean, the supplier supplys new instance of the 
> Transformer:
>  
>  
> {code:java}
> @Override public Transformer> get() 
> {     return ctx.getBean(MyTransformer.class); }{code}
>  
>  
> The only way to recover is to delete all topics used by kafkastreams, even if 
> application restarted same exception is thrown.
> *If messages in internal topics of 'store-changelog'  are deleted/offset 
> manipulated, can it cause the issue?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: FW: UNSUBSCRIBE

2023-09-12 Thread Matthias J. Sax

To unsubscribe, you need to send an email to

  users-unsubscr...@storm.apache.org.

Bests,
  -Matthias


On 9/12/23 6:37 AM, hmm0403 wrote:


UNSUBSCRIBE


내 Galaxy에서 보냄


 원본 이메일 
발신: Michele Volpe 
날짜: 23/9/12 오후 10:26 (GMT+09:00)
받은 사람: user@storm.apache.org
제목: Re: UNSUBSCRIBE

UNSUBSCRIBE

Il mar 12 set 2023, 13:05 yuz989 > ha scritto:





[jira] [Created] (KAFKA-15443) Upgrade RocksDB dependency

2023-09-07 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15443:
---

 Summary: Upgrade RocksDB dependency
 Key: KAFKA-15443
 URL: https://issues.apache.org/jira/browse/KAFKA-15443
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams currently depends on RocksDB 7.9.2

However, the latest version of RocksDB is already 8.5.3. We should check the 
RocksDB release notes to see what benefits we get to upgrade to the latest 
version (and file corresponding tickets to exploit improvement of newer 
releases as applicable).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15443) Upgrade RocksDB dependency

2023-09-07 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15443:
---

 Summary: Upgrade RocksDB dependency
 Key: KAFKA-15443
 URL: https://issues.apache.org/jira/browse/KAFKA-15443
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams currently depends on RocksDB 7.9.2

However, the latest version of RocksDB is already 8.5.3. We should check the 
RocksDB release notes to see what benefits we get to upgrade to the latest 
version (and file corresponding tickets to exploit improvement of newer 
releases as applicable).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15417:
---

Assignee: Victor van den Hoven

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Assignee: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762863#comment-17762863
 ] 

Matthias J. Sax commented on KAFKA-15417:
-

Thanks a lot! Assigned the ticket to you.

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Assignee: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762535#comment-17762535
 ] 

Matthias J. Sax edited comment on KAFKA-15417 at 9/7/23 1:39 AM:
-

Thanks. I did look into this and found this PR: 
[https://github.com/apache/kafka/commit/03411ca28b21d554d754b3f67a7857afe64f5bef]

It introduces `joinSpuriousLookBackTimeMs` but it seems it sets it to 
before/after for the left/right join node incorrectly – I think it must be 
flipped... The added unit tests seems to make incorrect assumptions when a 
window should get closed, but it was not caught during review (it's easy to 
flip before/after in your mind...)

Hence, the fix itself is simple, but updating the unit test is a little bit 
more work. Would you be interested to work on a PR to fix it?


was (Author: mjsax):
Thanks. I did look into this and found this PR: 
https://github.com/apache/kafka/commit/03411ca28b21d554d754b3f67a7857afe64f5bef

It introduces `joinSpuriousLookBackTimeMs` but it seems it's set before/after 
for the left/right join incorrectly – I think it must be flipped... The added 
unit tests seems to make incorrect assumptions when a window should get closed, 
but it was not caught during review (it's easy to flip before/after in your 
mind...)

Hence, the fix itself is simple, but updating the unit test is a little bit 
more work. Would you be interested to work on a PR to fix it?

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762535#comment-17762535
 ] 

Matthias J. Sax commented on KAFKA-15417:
-

Thanks. I did look into this and found this PR: 
https://github.com/apache/kafka/commit/03411ca28b21d554d754b3f67a7857afe64f5bef

It introduces `joinSpuriousLookBackTimeMs` but it seems it's set before/after 
for the left/right join incorrectly – I think it must be flipped... The added 
unit tests seems to make incorrect assumptions when a window should get closed, 
but it was not caught during review (it's easy to flip before/after in your 
mind...)

Hence, the fix itself is simple, but updating the unit test is a little bit 
more work. Would you be interested to work on a PR to fix it?

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: kafka streams consumer group reporting lag even on source topics removed from topology

2023-09-05 Thread Matthias J. Sax

Great!

On 9/5/23 1:23 AM, Pushkar Deole wrote:

I think I could figure out a way. There are certain commands that can be
executed from kafka-cli to disassociate a consumer group from the topic
that are not more being consumed.
With this sort of command, I could delete the consumer offsets for a
consumer group for a specific topic and that resolved the lag problem:

kafka-consumer-groups --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS
--command-config ~/kafka.properties --delete-offsets --group
"" --topic " wrote:


As long as the consumer group is active, nothing will be deleted. That
is the reason why you get those incorrect alerts -- Kafka cannot know
that you stopped consuming from those topics. (That is what I tried to
explain -- seems I did a bad job...)

Changing the group.id is tricky because Kafka Streams uses it to
identify internal topic names (for repartiton and chagnelog topics), and
thus your app would start with newly created (and thus empty topics). --
You might want to restart the app with `auto.offset.reset = "earliest"`
and reprocess all available input to re-create state.


-Matthias

On 8/19/23 8:07 AM, Pushkar Deole wrote:

@matthias

what are the alternatives to get rid of this issue? When the lag starts
increasing, we have alerts configured on our monitoring system in Datadog
which starts sending alerts and alarms to reliability teams. I know in
kafka the inactive consumer group is cleared up after 7 days however not
sure if that is the case with topics that were consumed previously and

not

consumed now.

Does creation of new consumer group (setting a different application.id)

on

streams application an option here?


On Thu, Aug 17, 2023 at 7:03 AM Matthias J. Sax 

wrote:



Well, it's kinda expected behavior. It's a split brain problem.

In the end, you use the same `application.id / group.id` and thus the
committed offsets for the removed topics are still in
`__consumer_offsets` topics and associated with the consumer group.

If a tool inspects lags and compares the latest committed offsets to
end-offsets it looks for everything it finds in the `__consumer_offsets`
topics for the group in question -- the tool cannot know that you
changed the application and that is does not read from those topics any
longer (and thus does not commit any longer).

I am not sure from top of my head if you could do a manual cleanup for
the `application.id` and topics in question and delete the committed
offsets from the `__consumer_offsets` topic -- try to checkout `Admin`
client and/or the command line tools...

In know that it's possible to delete committed offsets for a consumer
group (if a group becomes inactive, the broker would also cleanup all
group metadata after a configurable timeout), but I am not sure if
that's for the entire consumer group (ie, all topic) or if you can do it
on a per-topic basis, too.


HTH,
 -Matthias


On 8/16/23 2:11 AM, Pushkar Deole wrote:

Hi streams Dev community  @matthias, @bruno

Any inputs on above issue? Is this a bug in the streams library wherein

the

input topic removed from streams processor topology, the underlying
consumer group still reporting lag against those?

On Wed, Aug 9, 2023 at 4:38 PM Pushkar Deole 

wrote:



Hi All,

I have a streams application with 3 instances with application-id set

to

applicationV1. The application uses processor API with reading from

source

topics, processing the data and writing to destination topic.
Currently it consumes from 6 source topics however we don't need to
process data any more from 2 of those topics so we removed 2 topics

from

the source topics list. We have configured Datadog dashboard to report

and

alert on consumer lag so after removing the 2 source topics and

deploying

application, we started getting several alerts about consumer lag on
applicationV1 consumer group which is underlying consumer group of the
streams application. When we looked at the consumer group from

kafka-cli,

we could see that the consumer group is reporting lag against the

topics

removed from source topic list which is reflecting as increasing lag

on

Datadog monitoring.

Can someone advise if this is expected behavior? In my opinion, this

is

not expected since streams application no more has those topics as

part

of

source, it should not report lag on those.













[jira] [Created] (KAFKA-15437) Add metrics about open iterators

2023-09-05 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15437:
---

 Summary: Add metrics about open iterators
 Key: KAFKA-15437
 URL: https://issues.apache.org/jira/browse/KAFKA-15437
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams allows to create iterators over state stores. Those iterator must 
get closed to free up resources (especially for RocksDB). – We regularly get 
user reports of "resource leaks" that can be pinned down to leaking (ie 
not-closed) iterators.

To simplify monitoring, it would be helpful to add a metric about open 
iterators to allow users to alert and pin-point the issue directly (and before 
the actually resource leak is observed).

We might want to have a DEBUG level per-store metric (to allow identifying the 
store in question quickly), but an already rolled up INFO level metric for the 
whole application.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15437) Add metrics about open iterators

2023-09-05 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15437:
---

 Summary: Add metrics about open iterators
 Key: KAFKA-15437
 URL: https://issues.apache.org/jira/browse/KAFKA-15437
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams allows to create iterators over state stores. Those iterator must 
get closed to free up resources (especially for RocksDB). – We regularly get 
user reports of "resource leaks" that can be pinned down to leaking (ie 
not-closed) iterators.

To simplify monitoring, it would be helpful to add a metric about open 
iterators to allow users to alert and pin-point the issue directly (and before 
the actually resource leak is observed).

We might want to have a DEBUG level per-store metric (to allow identifying the 
store in question quickly), but an already rolled up INFO level metric for the 
whole application.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15309) Add custom error handler to Producer

2023-09-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761924#comment-17761924
 ] 

Matthias J. Sax commented on KAFKA-15309:
-

Sure, the ticket is up for grabs. Note, that we will need a KIP for this to get 
a proper and approved design -> 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] 

Let us know if you have any questions about the KIP process.

> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>        Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
> Attachments: KafkaProducerReproducer.java, app.log
>
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggests to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.
> Another example for which a production exception handler could be useful, if 
> a user tries to write into a non-existing topic, which returns a retryable 
> error code; with infinite retries the producer would hang retrying forever. A 
> handler could help to break the infinite retry loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761918#comment-17761918
 ] 

Matthias J. Sax edited comment on KAFKA-15417 at 9/5/23 1:57 AM:
-

I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config

{{\_\_emit.interval.ms.kstreams.outer.join.spurious.results.fix\_\_}}

to zero to see if it resolve the issue?


was (Author: mjsax):
I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config

{{__{_}emit.interval.ms.kstreams.outer.join.spurious.results.fix_{_}_}}

to zero to see if it resolve the issue?

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761918#comment-17761918
 ] 

Matthias J. Sax edited comment on KAFKA-15417 at 9/5/23 1:56 AM:
-

I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config

{{__emit.interval.ms.kstreams.outer.join.spurious.results.fix{{_}}{_}__}}

to zero to see if it resolve the issue?


was (Author: mjsax):
I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config 
`__{_}emit.interval.ms.kstreams.outer.join.spurious.results.fix{_}__` to zero 
to see if it resolves the issue?

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761918#comment-17761918
 ] 

Matthias J. Sax edited comment on KAFKA-15417 at 9/5/23 1:56 AM:
-

I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config

{{__{_}emit.interval.ms.kstreams.outer.join.spurious.results.fix_{_}_}}

to zero to see if it resolve the issue?


was (Author: mjsax):
I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config

{{__emit.interval.ms.kstreams.outer.join.spurious.results.fix{{_}}{_}__}}

to zero to see if it resolve the issue?

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761918#comment-17761918
 ] 

Matthias J. Sax edited comment on KAFKA-15417 at 9/5/23 1:55 AM:
-

I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config 
`__{_}emit.interval.ms.kstreams.outer.join.spurious.results.fix{_}__` to zero 
to see if it resolves the issue?


was (Author: mjsax):
I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config 
`__emit.interval.ms.kstreams.outer.join.spurious.results.fix__` to zero to see 
if it resolves the issue?

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761918#comment-17761918
 ] 

Matthias J. Sax commented on KAFKA-15417:
-

I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config 
`__emit.interval.ms.kstreams.outer.join.spurious.results.fix__` to zero to see 
if it resolves the issue?

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15383) Replace EasyMock with Mockito for KTableImplTest

2023-09-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15383:

Component/s: streams
 unit tests

> Replace EasyMock with Mockito for KTableImplTest
> 
>
> Key: KAFKA-15383
> URL: https://issues.apache.org/jira/browse/KAFKA-15383
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Fei Xie
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


<    1   2   3   4   5   6   7   8   9   10   >