[jira] [Updated] (KAFKA-15385) Replace EasyMock with Mockito for AbstractStreamTest

2023-09-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15385:

Component/s: streams
 unit tests

> Replace EasyMock with Mockito for AbstractStreamTest
> 
>
> Key: KAFKA-15385
> URL: https://issues.apache.org/jira/browse/KAFKA-15385
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Fei Xie
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15382) Replace EasyMock with Mockito for KStreamTransformValuesTest

2023-09-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15382:

Component/s: streams
 unit tests

> Replace EasyMock with Mockito for KStreamTransformValuesTest
> 
>
> Key: KAFKA-15382
> URL: https://issues.apache.org/jira/browse/KAFKA-15382
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Fei Xie
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15384) Replace EasyMock with Mockito for KTableTransformValuesTest

2023-09-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15384:

Component/s: streams
 unit tests

> Replace EasyMock with Mockito for KTableTransformValuesTest
> ---
>
> Key: KAFKA-15384
> URL: https://issues.apache.org/jira/browse/KAFKA-15384
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Fei Xie
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: AW: Table updates are not consistent when doing a join with a Stream

2023-09-04 Thread Matthias J. Sax
Your update to the KTable is async when you send data back to the KTable 
input topic. So your program is subject to race-conditions.


So switching to the PAPI was the right move: it make the update to the 
state store sync and thus fixes the issue.



-Matthias

On 9/4/23 5:53 AM, Mauricio Lopez wrote:

Hello,

They were getting processed by the same consumer as we only had a single 
machine running this.
What we ended up doing is basically drawing the same topology but interacting 
directly with the stateStore using the Processor API instead of DSL. Seems that 
fixed everything up (and made it way quicker).

Best,
Mauricio

On 2023/08/28 12:04:12 Claudia Kesslau wrote:

Hi,

I'm definitly no expert, but to me it sounds as not all your messages are 
getting processed by the same consumer. Are you using the key `foo` for 
partitioning? Is `baz` actually another key or is this mixup in your example 
and `baz` is another value with key `foo`?

Hope you find a solution to your problem.

Best,
Claudia

Von: Mauricio Lopez mailto:ml...@silversky.com>>
Gesendet: Donnerstag, 17. August 2023 22:57
An: users@kafka.apache.org 
mailto:us...@kafka.apache.org>>
Betreff: Table updates are not consistent when doing a join with a Stream

Hello Folks,

We are having an issue with a Kafka Streams Java application.
We have a KStream and a KTable which are joined using a Left Join. The entries 
in the KTable are constantly updated by the new information that comes from the 
KStream. Each KStream message is adding entries to an array that the KTable has 
for each key. This update gets sent back to the KTable topic, expanding this 
array every time a new message comes from the KStream.

As an example, what should be happening (and what happens in our unit tests) is:


   *   KTable has an empty array for key “foo”: []
   *   Event 1 comes with key “foo” and value “bar”
   *   Ktable gets updated to “foo”: [“bar”] , sending this update´to´ the same 
topic that the KTable is plugged into.
   *   Event 2 comes with key “baz”
   *   Update is pulled to mem by Ktable, and the Ktable gets updated to “foo”: 
[“bar, “baz”], sending this change ´to´ the same topic that the KTable is 
plugged into. Baz was appended to the array for key “foo”.

But what is happening is the following:


   *   KTable has an empty array for key “foo”: []

   *   Event 1 comes with key “foo” and value “bar”
   *   Ktable gets updated to “foo”: [“bar”] in the joiner, sending an event 
´to´ the same topic that the KTable is plugged to.
   *   Event 2 comes with key “baz”
   *   Ktable gets updated to “foo”: [“baz”]  in the joiner, sending an event 
´to´ the same topic that the KTable is plugged to afterwards.

This happens multiple times, and after a couple of seconds, one of the incoming 
messages is finally appended, but many of them are lost. As you can see, we 
suspect that when the Event 2 is received, the KTable has somehow not received  
the first update for adding “baz” to the array.
This means that many events are missed, and we cannot successfully get the 
KTable to save all the data for all the events. In turn, it sometimes 
overwrites the updates from some events.

So far, we have tried:


   *   Setting STATESTORE_CACHE_MAX_BYTES_CONFIG to 0, to attempt to force the 
app not to cache any changes and send to the output topic instantly.
   *   Setting COMMIT_INTERVAL_MS_CONFIG to 0, to attempt to force the app to 
send all updates instantly
   *   Setting TOPOLOGY_OPTIMIZATION_CONFIG to “reuse.ktable.source.topics” and 
“all” in case there is some optimization pattern that could help us.


None of these have allowed us to have a fully consistent update of the KTable 
each time a new event comes. It always gets overwritten or misses incoming 
updates made by events.  Can someone advice if there’s a way to make the KTable 
get successfully updated by each one of the events, as the first example shows?

Thanks,

Mauricio L



This message is for the sole use of the intended recipient(s) and may contain 
confidential and/or privileged information of SilverSky. Any unauthorized 
review, use, copying, disclosure, or distribution is prohibited. If you are not 
the intended recipient, please immediately contact the sender by reply email 
and delete all copies of the original message.



Mauricio López S.
Software Engineer



Re: kafka streams consumer group reporting lag even on source topics removed from topology

2023-09-04 Thread Matthias J. Sax
As long as the consumer group is active, nothing will be deleted. That 
is the reason why you get those incorrect alerts -- Kafka cannot know 
that you stopped consuming from those topics. (That is what I tried to 
explain -- seems I did a bad job...)


Changing the group.id is tricky because Kafka Streams uses it to 
identify internal topic names (for repartiton and chagnelog topics), and 
thus your app would start with newly created (and thus empty topics). -- 
You might want to restart the app with `auto.offset.reset = "earliest"` 
and reprocess all available input to re-create state.



-Matthias

On 8/19/23 8:07 AM, Pushkar Deole wrote:

@matthias

what are the alternatives to get rid of this issue? When the lag starts
increasing, we have alerts configured on our monitoring system in Datadog
which starts sending alerts and alarms to reliability teams. I know in
kafka the inactive consumer group is cleared up after 7 days however not
sure if that is the case with topics that were consumed previously and not
consumed now.

Does creation of new consumer group (setting a different application.id) on
streams application an option here?


On Thu, Aug 17, 2023 at 7:03 AM Matthias J. Sax  wrote:


Well, it's kinda expected behavior. It's a split brain problem.

In the end, you use the same `application.id / group.id` and thus the
committed offsets for the removed topics are still in
`__consumer_offsets` topics and associated with the consumer group.

If a tool inspects lags and compares the latest committed offsets to
end-offsets it looks for everything it finds in the `__consumer_offsets`
topics for the group in question -- the tool cannot know that you
changed the application and that is does not read from those topics any
longer (and thus does not commit any longer).

I am not sure from top of my head if you could do a manual cleanup for
the `application.id` and topics in question and delete the committed
offsets from the `__consumer_offsets` topic -- try to checkout `Admin`
client and/or the command line tools...

In know that it's possible to delete committed offsets for a consumer
group (if a group becomes inactive, the broker would also cleanup all
group metadata after a configurable timeout), but I am not sure if
that's for the entire consumer group (ie, all topic) or if you can do it
on a per-topic basis, too.


HTH,
-Matthias


On 8/16/23 2:11 AM, Pushkar Deole wrote:

Hi streams Dev community  @matthias, @bruno

Any inputs on above issue? Is this a bug in the streams library wherein

the

input topic removed from streams processor topology, the underlying
consumer group still reporting lag against those?

On Wed, Aug 9, 2023 at 4:38 PM Pushkar Deole 

wrote:



Hi All,

I have a streams application with 3 instances with application-id set to
applicationV1. The application uses processor API with reading from

source

topics, processing the data and writing to destination topic.
Currently it consumes from 6 source topics however we don't need to
process data any more from 2 of those topics so we removed 2 topics from
the source topics list. We have configured Datadog dashboard to report

and

alert on consumer lag so after removing the 2 source topics and

deploying

application, we started getting several alerts about consumer lag on
applicationV1 consumer group which is underlying consumer group of the
streams application. When we looked at the consumer group from

kafka-cli,

we could see that the consumer group is reporting lag against the topics
removed from source topic list which is reflecting as increasing lag on
Datadog monitoring.

Can someone advise if this is expected behavior? In my opinion, this is
not expected since streams application no more has those topics as part

of

source, it should not report lag on those.









[jira] [Updated] (KAFKA-15378) Rolling upgrade system tests are failing

2023-08-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15378:

Component/s: system tests

> Rolling upgrade system tests are failing
> 
>
> Key: KAFKA-15378
> URL: https://issues.apache.org/jira/browse/KAFKA-15378
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 3.5.1
>Reporter: Lucas Brutschy
>Priority: Major
>
> The system tests are having failures for these tests:
> {noformat}
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.1.2.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.2.3.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.6.0-SNAPSHOT
> {noformat}
> See 
> [https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5801/console]
>  for logs and other test data.
> Note that system tests currently only run with [this 
> fix](https://github.com/apache/kafka/commit/24d1780061a645bb2fbeefd8b8f50123c28ca94e),
>  I think some CVE python library update broke the system tests... 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-08-16 Thread Matthias J. Sax

Thanks for splitting this part into a separate KIP!

For `withKey()` we should be explicit that `null` is not allowed.

(Looking into existing `KeyQuery` it seems the JavaDocs don't cover this 
either -- would you like to do a tiny cleanup PR for this, or fix 
on-the-side in one of your PRs?)





The key query returns all the records that are valid in the time range starting 
from the timestamp {@code fromTimestamp}.


In the JavaDocs you use the phrase `are valid` -- I think we need to 
explain what "valid" means? It might even be worth to add some examples. 
It's annoying, but being precise if kinda important.


With regard to KIP-962, should we allow `null` for time bounds ? The 
JavaDocs should also be explicit if `null` is allowed or not and what 
the semantics are if allowed.




You are using `asOf()` however, because we are doing time-range queries, 
to me using `until()` to describe the upper bound would sound better (I 
am not a native speaker though, so maybe I am off?)




The key query returns all the records that have timestamp <= {@code 
asOfTimestamp}.


This is only correct if not lower-bound is set, right?


In your reply to KIP-960 you mentioned:


the meaningless combinations are prevented by throwing exceptions.


We should add corresponding JavaDocs like:

   @throws IllegalArgumentException if {@code fromTimestamp} is equal or
larger than {@code untilTimestamp}

Or something similar.


With regard to KIP-960: if we need to introduce a `VersionedKeyQuery` 
class for single-key-single-ts lookup, would we need to find a new name 
for the query class of this KIP, given that the return type is different?



-Matthias



On 8/16/23 10:57 AM, Alieh Saeedi wrote:

Hi all,

I splitted KIP-960

into three separate KIPs. Therefore, please continue discussions
about single-key, multi-timestamp interactive queries here. You can see all
the addressed reviews on the following page. Thanks in advance.

KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for
versioned state stores


I look forward to your feedback!

Cheers,
Alieh



Re: [DISCUSS] KIP-960: Support interactive queries (IQv2) for versioned state stores

2023-08-16 Thread Matthias J. Sax
y_multi-timestamp Interactive Queries (IQv2) for
Versioned State Stores
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-968%3A+Support+single-key_multi-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores>
KIP-969: Support range Interactive Queries (IQv2) for Versioned State
Stores
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-969%3A+Support+range+interactive+queries+%28IQv2%29+for+versioned+state+stores>

Cheers,
Alieh

On Thu, Aug 10, 2023 at 2:38 AM Matthias J. Sax  wrote:


Seems there was a lot of additional feedback. Looking forward to an
updated version of the KIP.

I also agree to make the queries more composable. I was considering to
raise this originally, but hold off because `RangeQuery` is also not
designed very composable. But for versioned store, we have many more
combinations, so making it composable does make sense to me.

About iterator order: I would also propose to be pragmatic, and only add
what is simple to implement for now. We can always extend it later. We
just need to clearly document the order (or say: order is not defined --
also a valid option). Of course, if we limit what we add now, we should
keep in mind how to extend the API in the future without the need to
deprecate a lot of stuff (ideally, we would not need to deprecate
anything but only extend what we have).

Btw: I am also happy to de-scope this KIP to only implement the two
queries Victoria mentioned being easy to implement, and do follow up
KIPs for range queries. There is no need to do everything with a single
KIP.

About the original v-store KIP and `long` vs `Instance` -- I don't think
we forget it. If the store is use inside a `Processor` using `long` is
preferred because performance is important and we are on the hot code
path. For IQ on the other hand, it's not the hot code path, and
semantics exposed to the user are more important. -- At least, this is
how we did it in the past.


One more thoughts.

The new `VersionedKeyQuery` seems to have two different query types
merged into a single class. Queries which return a single result, and
queries that return multiple results. This does not seem ideal. For
`withKeyLatestValue` and `withKeyWithTimestampBound` (should we rename
this to `withKeyAsOfTimestamp`?) I would expect to get a single
`VersionedRecord` back, not an interator. Hence, we might need to
split `VersionedKeyQuery` into two query types?


-Matthias




On 8/9/23 6:46 AM, Victoria Xia wrote:

Hey Alieh,

Thanks for the KIP!

It looks like the KIP proposes three different types of interactive

queries for versioned stores, though they are grouped together into two
classes: VersionedKeyQuery adds supports for single-key, single-timestamp
lookups, and also for single-key, multi-timestamp lookups, while
VersionedRangeQuery additionally adds support for key-range queries.


The first type of query (single-key, single-timestamp lookups) are

already supported by versioned stores (per the VersionedKeyValueStore
interface) today, so exposing these via interactive queries require low
additional implementation effort, and are a quick win to users. The other
two types of queries will require more effort to add, and also come with
more design decisions. I've sorted my thoughts accordingly.


Regarding single-key, multi-timestamp lookups:

1. If we add these, we should add a new method to the

VersionedKeyValueStore interface to support this type of lookup. Otherwise,
there is no easy/efficient way to compose methods from the existing
interface in order to implement this type of lookup, and therefore the new
interactive query type cannot be used on generic VersionedKeyValueStores.


2. I agree with Matthias's and Lucas's comments about being very

explicit about what the timestamp range means. For consistency with
single-key, single-timestamp lookups, I think the "upper timestamp bound"
should really be an "as of timestamp bound" instead, so that it is
inclusive. For the "lower timestamp bound"/start timestamp, we have a
choice regarding whether to interpret it as the user saying "I want valid
records for all timestamps in the range" in which case the query should
return a record with timestamp earlier than the start timestamp, or to
interpret it as the user saying "I want all records with timestamps in the
range" in which case the query should not return any records with timestamp
earlier than the start timestamp. My current preference is for the former,
but it'd be good to hear other opinions.


3. The existing VersionedRecord interface contains only a value and

validFrom timestamp, and does not allow null values. This presents a
problem for introducing single-key, multi-timestamp lookups because if
there is a tombstone contained within the timestamp range of the query,
then there is no way to represent this as part of a
ValueIterator return type. You'll either have to allow
null values or add a validTo timestamp to the r

Re: kafka streams consumer group reporting lag even on source topics removed from topology

2023-08-16 Thread Matthias J. Sax

Well, it's kinda expected behavior. It's a split brain problem.

In the end, you use the same `application.id / group.id` and thus the 
committed offsets for the removed topics are still in 
`__consumer_offsets` topics and associated with the consumer group.


If a tool inspects lags and compares the latest committed offsets to 
end-offsets it looks for everything it finds in the `__consumer_offsets` 
topics for the group in question -- the tool cannot know that you 
changed the application and that is does not read from those topics any 
longer (and thus does not commit any longer).


I am not sure from top of my head if you could do a manual cleanup for 
the `application.id` and topics in question and delete the committed 
offsets from the `__consumer_offsets` topic -- try to checkout `Admin` 
client and/or the command line tools...


In know that it's possible to delete committed offsets for a consumer 
group (if a group becomes inactive, the broker would also cleanup all 
group metadata after a configurable timeout), but I am not sure if 
that's for the entire consumer group (ie, all topic) or if you can do it 
on a per-topic basis, too.



HTH,
  -Matthias


On 8/16/23 2:11 AM, Pushkar Deole wrote:

Hi streams Dev community  @matthias, @bruno

Any inputs on above issue? Is this a bug in the streams library wherein the
input topic removed from streams processor topology, the underlying
consumer group still reporting lag against those?

On Wed, Aug 9, 2023 at 4:38 PM Pushkar Deole  wrote:


Hi All,

I have a streams application with 3 instances with application-id set to
applicationV1. The application uses processor API with reading from source
topics, processing the data and writing to destination topic.
Currently it consumes from 6 source topics however we don't need to
process data any more from 2 of those topics so we removed 2 topics from
the source topics list. We have configured Datadog dashboard to report and
alert on consumer lag so after removing the 2 source topics and deploying
application, we started getting several alerts about consumer lag on
applicationV1 consumer group which is underlying consumer group of the
streams application. When we looked at the consumer group from kafka-cli,
we could see that the consumer group is reporting lag against the topics
removed from source topic list which is reflecting as increasing lag on
Datadog monitoring.

Can someone advise if this is expected behavior? In my opinion, this is
not expected since streams application no more has those topics as part of
source, it should not report lag on those.





[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755298#comment-17755298
 ] 

Matthias J. Sax commented on KAFKA-15302:
-

I would need to think more about this. Overall, it seems a little bit hacky, 
and I am not sure if there is a clean fix.

For the time being, it might be best to just document the issue – the 
workaround seems to be, to call `flush()` expliclity before creating the 
iterator. Because there is an existing workaround, I would rather not rush into 
a hacky fix.

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator 
> all()
> 01:05:00.588 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
> 01:05:00.590 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67

[jira] [Commented] (KAFKA-15338) The metric group documentation for metrics added in KAFKA-13945 is incorrect

2023-08-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755184#comment-17755184
 ] 

Matthias J. Sax commented on KAFKA-15338:
-

Sure. Thank a lot!

> The metric group documentation for metrics added in KAFKA-13945 is incorrect
> 
>
> Key: KAFKA-15338
> URL: https://issues.apache.org/jira/browse/KAFKA-15338
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1, 3.5.1
>Reporter: Neil Buesing
>Assignee: Atul Sharma
>Priority: Trivial
>  Labels: beginner, newbie
>
> ops.html (docs/streams/ops.html) incorrectly states that the metrics type is 
> "stream-processor-node-metrics", but in looking at the metrics and inspecting 
> the code in TopicMetrics, these metrics have a type of "stream-topic-metrics".
> 4 metrics are in error "bytes-consumed-total", "bytes-produced-total", 
> "records-consumed-total", and "records-produced-total".
> Looks like the type was changed from the KIP, and the documentation still 
> reflects the KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15346) Single-Key_single-timestamp IQs with versioned state stores

2023-08-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15346:

Component/s: streams

> Single-Key_single-timestamp IQs with versioned state stores
> ---
>
> Key: KAFKA-15346
> URL: https://issues.apache.org/jira/browse/KAFKA-15346
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>
> [KIP-960|https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers just two query types:
> *Key Queries with single timestamp:*
>  # single-key latest-value lookup
>  # single-key lookup with timestamp (upper) bound



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15347) Single-Key_multi-timestamp IQs with versioned state stores

2023-08-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15347:

Component/s: streams

> Single-Key_multi-timestamp IQs with versioned state stores
> --
>
> Key: KAFKA-15347
> URL: https://issues.apache.org/jira/browse/KAFKA-15347
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>
> [KIP-968|https://cwiki.apache.org/confluence/display/KAFKA/KIP-968%3A+Support+single-key_multi-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers just four query types:
> *Key Queries with multiple timestamps:*
>  # single-key query with upper bound timestamp
>  # single-key query with lower bound timestamp
>  # single-key query with timestamp range
>  # single-key all versions query



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15348) Range IQs with versioned state stores

2023-08-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15348:

Component/s: streams

> Range IQs with versioned state stores
> -
>
> Key: KAFKA-15348
> URL: https://issues.apache.org/jira/browse/KAFKA-15348
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>
> [KIP-969|https://cwiki.apache.org/confluence/display/KAFKA/KIP-969%3A+Support+range+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers all types of range queries:
> *Range Queries*
>  # key-range latest-value query
>  # key-range with lower bound latest-value query
>  # key-range with upper bound latest-value query
>  # all-keys (no bound) latest-value query
>  # key-range query with timestamp (upper) bound
>  # key-range with lower bound with timestamp (upper) bound 
>  # key-range with upper bound with timestamp (upper) bound
>  # all-keys (no bound) with timestamp (upper) bound
>  # key-range query with timestamp range
>  # key-range query with lower bound with timestamp range
>  # key-range query with upper bound with timestamp range
>  # all-keys (no bound) with timestamp range
>  # key-range query all-versions
>  # key-range query with lower bound all-versions
>  # key-range query with upper bond all-versions
>  # all-keys query (no bound) all-versions (entire store)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15344) Kafka Streams should include the message leader epoch when committing offsets

2023-08-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15344:

Component/s: streams

> Kafka Streams should include the message leader epoch when committing offsets
> -
>
> Key: KAFKA-15344
> URL: https://issues.apache.org/jira/browse/KAFKA-15344
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: David Mao
>Priority: Major
>
> We noticed an application received an OFFSET_OUT_OF_RANGE error following a 
> network partition and streams task rebalance and subsequently reset its 
> offsets to the beginning.
> Inspecting the logs, we saw multiple consumer log messages like: 
> {code:java}
> Setting offset for partition tp to the committed offset 
> FetchPosition{offset=1234, offsetEpoch=Optional.empty...)
> {code}
> Inspecting the streams code, it looks like kafka streams calls `commitSync` 
> passing through an explicit OffsetAndMetadata object but does not populate 
> the offset leader epoch.
> The offset leader epoch is required in the offset commit to ensure that all 
> consumers in the consumer group have coherent metadata before fetching. 
> Otherwise after a consumer group rebalance, a consumer may fetch with a stale 
> leader epoch with respect to the committed offset and get an offset out of 
> range error from a zombie partition leader.
> The low-hanging fruit fix would be to have streams pass in the message epoch 
> for each commit. Another fix discussed with [~hachikuji] is to have the 
> consumer cache leader epoch ranges, similar to how the broker maintains a 
> leader epoch cache.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-08-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15319:
---

Assignee: Lucas Brutschy

> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Assignee: Lucas Brutschy
>Priority: Critical
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 
> https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-08-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15319:

Fix Version/s: 3.6.0

> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Assignee: Lucas Brutschy
>Priority: Critical
> Fix For: 3.6.0
>
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 
> https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15338) The metric group documentation for metrics added in KAFKA-13945 is incorrect

2023-08-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15338:

Labels: beginner newbie  (was: )

> The metric group documentation for metrics added in KAFKA-13945 is incorrect
> 
>
> Key: KAFKA-15338
> URL: https://issues.apache.org/jira/browse/KAFKA-15338
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1, 3.5.1
>Reporter: Neil Buesing
>Priority: Trivial
>  Labels: beginner, newbie
>
> ops.html (docs/streams/ops.html) incorrectly states that the metrics type is 
> "stream-processor-node-metrics", but in looking at the metrics and inspecting 
> the code in TopicMetrics, these metrics have a type of "stream-topic-metrics".
> 4 metrics are in error "bytes-consumed-total", "bytes-produced-total", 
> "records-consumed-total", and "records-produced-total".
> Looks like the type was changed from the KIP, and the documentation still 
> reflects the KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15337) Disable *-subscription-store-changelog topic creation for foreign key join in Kafka Streams

2023-08-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15337:

Labels: need-kip  (was: )

> Disable *-subscription-store-changelog topic creation for foreign key join in 
> Kafka Streams
> ---
>
> Key: KAFKA-15337
> URL: https://issues.apache.org/jira/browse/KAFKA-15337
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Rohan
>Priority: Minor
>  Labels: need-kip
>
> I recently asked a question on 
> [stackoverflow|https://stackoverflow.com/questions/76801628/disable-subscription-store-changelog-topic-creation-for-foreign-key-join-in-ka]
>  where [~mjsax] commented to raise a ticket for this.
>  
> *Requirement:*
> When we do foreign key join in Kafka streams, it creates 3 state stores (as 
> given on the [confluent 
> page|https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/])
>  :
> {quote}The three state stores involved are:
>  # left-STATE-STORE-00: The left-hand-side KTable store.
>  # right-STATE-STORE-03: The left-hand-side KTable store.
>  # myjoin-subscription-store: The subscription store for the join. This is 
> the new data structure discussed above. Regardless of the operation name, 
> this store is always suffixed -subscription-store. {_}Note for your capacity 
> planning that this store will also have a corresponding changelog, called 
> *myjoin-subscription-store-changelog*{_}.{quote}
> Can we have an additional option to disable this 
> *myjoin-subscription-store-changelog* topic creation?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15337) Disable *-subscription-store-changelog topic creation for foreign key join in Kafka Streams

2023-08-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15337:

Component/s: streams

> Disable *-subscription-store-changelog topic creation for foreign key join in 
> Kafka Streams
> ---
>
> Key: KAFKA-15337
> URL: https://issues.apache.org/jira/browse/KAFKA-15337
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Rohan
>Priority: Minor
>
> I recently asked a question on 
> [stackoverflow|https://stackoverflow.com/questions/76801628/disable-subscription-store-changelog-topic-creation-for-foreign-key-join-in-ka]
>  where [~mjsax] commented to raise a ticket for this.
>  
> *Requirement:*
> When we do foreign key join in Kafka streams, it creates 3 state stores (as 
> given on the [confluent 
> page|https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/])
>  :
> {quote}The three state stores involved are:
>  # left-STATE-STORE-00: The left-hand-side KTable store.
>  # right-STATE-STORE-03: The left-hand-side KTable store.
>  # myjoin-subscription-store: The subscription store for the join. This is 
> the new data structure discussed above. Regardless of the operation name, 
> this store is always suffixed -subscription-store. {_}Note for your capacity 
> planning that this store will also have a corresponding changelog, called 
> *myjoin-subscription-store-changelog*{_}.{quote}
> Can we have an additional option to disable this 
> *myjoin-subscription-store-changelog* topic creation?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13197) KStream-GlobalKTable join semantics don't match documentation

2023-08-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13197.
-
Fix Version/s: 3.6.0
   3.5.2
   Resolution: Fixed

> KStream-GlobalKTable join semantics don't match documentation
> -
>
> Key: KAFKA-13197
> URL: https://issues.apache.org/jira/browse/KAFKA-13197
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 2.7.0
>Reporter: Tommy Becker
>Assignee: Florin Akermann
>Priority: Major
> Fix For: 3.6.0, 3.5.2
>
>
> As part of KAFKA-10277, the behavior of KStream-GlobalKTable joins was 
> changed. It appears the change was intended to merely relax a requirement but 
> it actually broke backwards compatibility. Although it does allow {{null}} 
> keys and values in the KStream to be joined, it now excludes {{null}} results 
> of the {{KeyValueMapper}}. We have an application which can return {{null}} 
> from the {{KeyValueMapper}} for non-null keys in the KStream, and relies on 
> these nulls being passed to the {{ValueJoiner}}. Indeed the javadoc still 
> explicitly says this is done:
> {quote}If a KStream input record key or value is null the record will not be 
> included in the join operation and thus no output record will be added to the 
> resulting KStream.
>  If keyValueMapper returns null implying no match exists, a null value will 
> be provided to ValueJoiner.
> {quote}
> Both these statements are incorrect.
> I think the new behavior is worse than the previous/documented behavior. It 
> feels more reasonable to have a non-null stream record map to a null join key 
> (our use-case is event-enhancement where the incoming record doesn't have the 
> join field), than the reverse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13197) KStream-GlobalKTable join semantics don't match documentation

2023-08-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13197.
-
Fix Version/s: 3.6.0
   3.5.2
   Resolution: Fixed

> KStream-GlobalKTable join semantics don't match documentation
> -
>
> Key: KAFKA-13197
> URL: https://issues.apache.org/jira/browse/KAFKA-13197
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 2.7.0
>Reporter: Tommy Becker
>Assignee: Florin Akermann
>Priority: Major
> Fix For: 3.6.0, 3.5.2
>
>
> As part of KAFKA-10277, the behavior of KStream-GlobalKTable joins was 
> changed. It appears the change was intended to merely relax a requirement but 
> it actually broke backwards compatibility. Although it does allow {{null}} 
> keys and values in the KStream to be joined, it now excludes {{null}} results 
> of the {{KeyValueMapper}}. We have an application which can return {{null}} 
> from the {{KeyValueMapper}} for non-null keys in the KStream, and relies on 
> these nulls being passed to the {{ValueJoiner}}. Indeed the javadoc still 
> explicitly says this is done:
> {quote}If a KStream input record key or value is null the record will not be 
> included in the join operation and thus no output record will be added to the 
> resulting KStream.
>  If keyValueMapper returns null implying no match exists, a null value will 
> be provided to ValueJoiner.
> {quote}
> Both these statements are incorrect.
> I think the new behavior is worse than the previous/documented behavior. It 
> feels more reasonable to have a non-null stream record map to a null join key 
> (our use-case is event-enhancement where the incoming record doesn't have the 
> join field), than the reverse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-08-10 Thread Matthias J. Sax
d in the table if we cannot get rid of the fat record in 
any case?)





-Matthias



On 8/10/23 12:09 PM, Igor Fomenko wrote:

I don't mind you being a bit picky. I think it is a great discussion and it
helps me too. For example I clearly see now that the problem of aggregation
still needs to be solved for this use case.
Please see my answers below.

I used an example of OrderEvents to OrderItems relationship as 1:1 just to
demonstrate that even in tht simple case the existing table-table join on
FK will not work. However the use case I have in general may have 1:1, 1:0,
or 1:n relations. One complex business entity I had to deal with called
"transportation waybill" that has 25 child tables. Number of child records
in each child table could be 0:n for each record in the main waybill table.
When an event is generated for a certain waybill then a "complete" waybill
needs to be assembled from the subset of child tables. The subset of child
tables for waybill data assembly depends on the event type (any event type
has waybillId). There is also some additional filtering mapping and
enrichment that needs to be done in a real use case that is not relevant to
what we discuss. As you can see the use case is very complex and this is
why I wanted to distill it to very simple terms that are relevant to this
discussion.

Now I am switching back to the simple example of OrderEvent with OrderItems.
Please note that OrderEvent is a derived message. It is derived by joining
the actual event message that has orderId as its key with the Order message
that also has OrderId as its key. Because joining these two messages is
trivial I did not include this part and stated that we are sharing from
the  Order Event message right away.
So to summarize: We need to join each OrderEvent message (OrderId is key)
with 0 or 1 or many orderItems messages (OrderItem is the key and orderID
is one of the message fields).

Now, let's consider your solution:
1. We do not need to aggregate orderEvents around the key since we need to
provide an output for each orderEvent (each orderEvent needs to be joined
with an aggregate of OrderItems related to this orderEvent). So we can skip
this step.
2. Because OrderItems are multiple distinct records for each OrderId we can
not rekey them with OrderId PK to the table, uness we do some sort of
aggregation for them. So let's say we rekey orderItems with orderId and
aggregate each record field into an array. I think we also need to
co-partition with OrderEvents.
3. Now we can do stream-table join the orderEvents stream with the
OrderItemsAggregated table using the OrderId key that is common for both.

So the conclusion is that your solution will work with some tweaking
(basically aggregating on OrderItems instead of on events).
While this solution will work it has several issues as follows:

- This solution was considered when in KIP-213 for the existing
table-table FK join. There is a discussion on disadvantages of using this
approach in the article related to KIP-213 and I think the same
disadvantages will apply to this KIP. Please see here:

https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/#workaround

- I see FK join as the common operation in data manipulation so it would
be nice to have a shortcut for it and not to try to design it from existing
functionality all the time. Consider the real use case I discussed at the
beginning when a business entity has 25 children
- This solution similarly to mine is "mudding the water" by providing a
hybrid outcome join + aggregate. At list with my proposal we could
potentially control it with the flag, or maybe create some special
aggregate that could be chained after (don't know how to do it yet :-))

Any thoughts?

Regards,

Igor

On Wed, Aug 9, 2023 at 7:19 PM Matthias J. Sax  wrote:


Thanks for the details. And sorry for being a little bit picky. My goal
is to really understand the use-case and the need for this KIP. It's a
massive change and I just want to ensure we don't add (complex) things
unnecessarily.


So you have a streams of "orderEvents" with key=orderId. You cannot
represent them as a KTable, because `orderId` is not a PK, but just an
identify that a message belongs to a certain order. This part I understand.

You also have a KTable "orderItems", with orderId as a value-field.




  Relationship between parent and child messages is 1:1


If I understand correctly, you want to join on orderId. If the join is
1:1, it means that there is only a single table-record for each unique
orderId. Thus, orderId could be the PK of the table. If that's correct,
you could use orderId as the key of "orderItems" and do a regular
stream-table join. -- Or do I miss something?




and to send it only once to the target system as one ‘complete order >

message for each new ‘order event’ message.

This sound like an agg

Re: [VOTE] KIP-962 Relax non-null key requirement in Kafka Streams

2023-08-10 Thread Matthias J. Sax

+1 (binding)

On 8/10/23 12:31 PM, Florin Akermann wrote:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams



[jira] [Updated] (KAFKA-13197) KStream-GlobalKTable join semantics don't match documentation

2023-08-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13197:

Component/s: documentation
 streams

> KStream-GlobalKTable join semantics don't match documentation
> -
>
> Key: KAFKA-13197
> URL: https://issues.apache.org/jira/browse/KAFKA-13197
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 2.7.0
>Reporter: Tommy Becker
>Assignee: Florin Akermann
>Priority: Major
>
> As part of KAFKA-10277, the behavior of KStream-GlobalKTable joins was 
> changed. It appears the change was intended to merely relax a requirement but 
> it actually broke backwards compatibility. Although it does allow {{null}} 
> keys and values in the KStream to be joined, it now excludes {{null}} results 
> of the {{KeyValueMapper}}. We have an application which can return {{null}} 
> from the {{KeyValueMapper}} for non-null keys in the KStream, and relies on 
> these nulls being passed to the {{ValueJoiner}}. Indeed the javadoc still 
> explicitly says this is done:
> {quote}If a KStream input record key or value is null the record will not be 
> included in the join operation and thus no output record will be added to the 
> resulting KStream.
>  If keyValueMapper returns null implying no match exists, a null value will 
> be provided to ValueJoiner.
> {quote}
> Both these statements are incorrect.
> I think the new behavior is worse than the previous/documented behavior. It 
> feels more reasonable to have a non-null stream record map to a null join key 
> (our use-case is event-enhancement where the incoming record doesn't have the 
> join field), than the reverse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-962 Relax non-null key requirement in Kafka Streams

2023-08-10 Thread Matthias J. Sax

Good catch about the JavaDocs.

Seems we missed to update them when we did K10277. Would you like to do 
a PR to fix them right away for upcoming 3.6 release?


If there is no more other comments, I think you can start a VOTE thread.


-Matthias

On 8/10/23 4:22 AM, Florin Akermann wrote:

Thank you for the feedback.


Not sure if this is the right phrasing?


You are right. I adjusted the phrasing accordingly.
Given your description of the current behavior, do I understand correctly
that the current documentation for the left join KStream-GlobalKtable is
out of date?
https://github.com/apache/kafka/blob/9318b591d7a57b9db1e7519986d78f0402cd5b5e/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java#L2948C7-L2948C7


I would remove this from the KIP


I agree,Removed.
Plus, relevant doc links added.


I think the way it's phrased is good. [...] We can cover details on the

PR.

Great. Yes, in general, I hope everybody agrees that we shouldn't add more
details to this KIP


On Thu, 10 Aug 2023 at 00:16, Matthias J. Sax  wrote:


Thanks for the KIP.


left join KStream-GlobalTable: no longer drop left records with null-key

and call KeyValueMapper with 'null' for left  key. The case where
KeyValueMapper returns null is already handled in the current
implementation.

Not sure if this is the right phrasing? In the end, even now, the stream
input record key can be null (cf
https://issues.apache.org/jira/browse/KAFKA-10277) -- a stream record is
only dropped if the `KeyValueMapper` returns `null` (note that the
key-extractor has no default implemenation but is a required argument)
-- this KIP would relax this case for left-join.



In the pull request all relevant Javadocs will be updated with the

information on how to keep the old behavior for a given operator / method.

I would remove this from the KIP -- I am also not sure if we should put
it into the JavaDoc? -- I agree that it should go into the upgrade docs
as well as "join section" in the docs:

https://kafka.apache.org/35/documentation/streams/developer-guide/dsl-api.html#joining

We also have

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics
that we should update.



I added a remark about the repartition of null-key records.


I think the way it's phrase is good. In the end, it's an optimization to
drop records upstream (instead of piping them through the topic and drop
the downstream), and thus we don't have to cover it in the KIP in
detail. In general, for aggregations we can still apply the
optimization, however, we need to be careful as we could also have two
downstream operators with a shared repartition topic: for this case, we
can only drop upstream if all downstream operator would drop null-key
records anyway. We can cover details on the PR.



-Matthias



On 8/9/23 5:39 AM, Florin Akermann wrote:

Hi All,

I added a remark about the repartition of null-key records.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams#KIP962:RelaxnonnullkeyrequirementinKafkaStreams-Repartitionofnull-keyrecords


Can we relax this constraint for any kind of repartitioning or should it
only be relaxed in the context of left stream-table and left/outer
stream-stream joins?

Florin

On Mon, 7 Aug 2023 at 13:23, Florin Akermann 
wrote:


Hi Lucas,

Thanks. I added the point about the upgrade guide as well.

Florin

On Mon, 7 Aug 2023 at 11:06, Lucas Brutschy 
.invalid>

wrote:


Hi Florin,

thanks for the KIP! This looks good to me. I agree that the precise
Java doc wording doesn't have to be discussed as part of the KIP.

I would also suggest to include an update to
https://kafka.apache.org/documentation/streams/upgrade-guide

Cheers,
Lucas

On Mon, Aug 7, 2023 at 10:51 AM Florin Akermann
 wrote:


Hi Both,

Thanks.
I added remarks to account for this.




https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams#KIP962:RelaxnonnullkeyrequirementinKafkaStreams-Remarks


In short, let's add a note in the Java docs? The exact wording of the

note

can be scrutinized in the pull request?

What do you think?


On Sun, 6 Aug 2023 at 19:41, Guozhang Wang <

guozhang.wang...@gmail.com>

wrote:


I'm just thinking we can try to encourage users to migrate from XX to
XXWithKey in the docs, giving this as one good example that the

latter

can help you distinguish different scenarios whereas the former
cannot.

On Fri, Aug 4, 2023 at 6:32 PM Matthias J. Sax 

wrote:


Guozhang,

thanks for pointing out ValueJoinerWithKey. In the end, it's just a
documentation change, ie, point out that the passed in key could be
`null` and similar?

-Matthias


On 8/2/23 3:20 PM, Guozhang Wang wrote:

Thanks Florin for the writeup,

One quick thing I'd like to bring up is that in KIP-149
(





https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and

Re: [DISCUSS] KIP-960: Support interactive queries (IQv2) for versioned state stores

2023-08-09 Thread Matthias J. Sax
cas

On Thu, Jul 27, 2023 at 3:07 PM Alieh Saeedi
 wrote:


Thanks, Bruno, for the feedback.


 - I agree with both points 2 and 3. About 3: Having "VersionsQualifier"
 reduces the number of methods and makes everything less confusing. At the
 end, that will be easier to use for the developers.
 - About point 4: I renamed all the properties and parameters from
 "asOfTimestamp" to "fromTimestamp". That was my misunderstanding. So Now we
 have these two timestamp bounds: "fromTimestamp" and "untilTimestamp".
 - About point 5: Do we need system tests here? I assumed just
 integration tests were enough.
 - Regarding long vs timestamp instance: I think yes, that 's why I used
 Long as timestamp.

Bests,
Alieh






On Thu, Jul 27, 2023 at 2:28 PM Bruno Cadonna  wrote:


Hi Alieh,

Thanks for the KIP!


Here my feedback.

1.
You can remove the private fields and constructors from the KIP. Those
are implementation details.


2.
Some proposals for renamings

in VersionedKeyQuery

withKeyWithTimestampBound()
 -> withKeyAndAsOf()

withKeyWithTimestampRange()
 -> withKeyAndTimeRange()

in VersionedRangeQuery

KeyRangeWithTimestampBound()
 -> withKeyRangeAndAsOf()

withLowerBoundWithTimestampBound()
 -> withLowerBoundAndAsOf()

withUpperBoundWithTimestampBound()
 -> withUpperBoundAndAsOf()

withNoBoundWithTimestampBound()
 -> withNoBoundsAndAsOf

keyRangeWithTimestampRange()
 -> withKeyRangeAndTimeRange()

withLowerBoundWithTimestampRange()
 -> withLowerBoundAndTimeRange()

withUpperBoundWithTimestampRange()
 -> withUpperBounfAndTimeRange()

withNoBoundWithTimestampRange()
 -> withNoBoundsAndTimeRange()


3.
Would it make sense to merge
withKeyLatestValue(final K key)
and
withKeyAllVersions(final K key)
into
withKey(final K key, final VersionsQualifier versionsQualifier)
where VersionsQualifier is an enum with values (ALL, LATEST). We could
also add EARLIEST if we feel it might be useful.
Same applies to all methods that end in LatestValue or AllVersions


4.
I think getAsOfTimestamp() should not return the lower bound. If I query
a version as of a timestamp then the query should return the latest
version less than the timestamp.
I propose to rename the getters to getTimeFrom() and getTimeTo() as in
WindowRangeQuery.


5.
Please add the Test Plan section.


Regarding long vs Instant: Did we miss to use Instant instead of long
for all interfaces of the versioned state stores?


Best,
Bruno








On 7/26/23 11:40 PM, Matthias J. Sax wrote:

Thanks for the KIP Alieh. Glad to see that we can add IQ to the new
versioned stores!



Couple of questions:


single-key lookup with timestamp (upper) bound


Not sure if "bound" is the right term? In the end, it's a point lookup
for a key plus timestamps, so it's an as-of timestamp (not a bound)? Of
course, the returned record would most likely have a different (smaller)
timestamp, but that's expected but does not make the passed in timestamp
a "bound" IMHO?


single-key query with timestamp range
single-key all versions query


Should we also add `withLowerTimeBound` and `withUpperTimeBound`
(similar to what `RangeQuery` has)?

Btw: I think we should not pass `long` for timestamps, but `Instance`
types.

For time-range queries, do we iterate over the values in timestamp
ascending order? If yes, the interface should specify it? Also, would it
make sense to add reverse order (also ok to exclude and only do if there
is demand in a follow up KIP; if not, please add to "Rejected
alternatives" section).

Also, for time-range query, what are the exact bound for stuff we
include? In the end, a value was a "valid range" (conceptually), so do
we include a record if it's valid range overlaps the search time-range,
or must it be fully included? Or would we only say, that the `validFrom`
timestamp that is stored must be in the search range (what implies that
the lower end would be a non-overlapping but "fully included" bound,
while the upper end would be a overlapping bound).

For key-range / time-range queries: do we return the result in ``
order or `` order? Also, what about reverse iterators?

About ` ValueIterator` -- think the JavaDocs have c error in it for
`peekNextRecord` (also, should it be called `peekNextValue`? (Also some
other JavaDocs seem to be incomplete and not describe all parameters?)


Thanks.



-Matthias



On 7/26/23 7:24 AM, Alieh Saeedi wrote:

Hi all,

I would like to propose a KIP to support IQv2 for versioned state

stores.




https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+interactive+queries+%28IQv2%29+for+versioned+state+stores


Looking forward to your feedback!

Cheers,
Alieh





Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-08-09 Thread Matthias J. Sax
 the “order items” table on the right
side of the join to generate an output since only events should be the
trigger for output messages in our scenario. This is aligned with the
stream-table join behavior rather than table-table join when updates are
coming from both sides

3. Stream-table join will give us resulting stream which is more align
with our output requirements than the table that would be result of
table-table join



Requirement #2 above is the most important one and it can not be achieved
with existing table-table join on foreign key.



I also stated that the foreign key table in table-table join is on the
‘wrong’ side for our order management use case. By this I just meant that
in stream-table join I am proposing the foreign key table needs to be on
the right side and on the existing table-table join it is on the left. This
is however is irrelevant since we can not use table-table join anyway for
the reason #2 above.



You made a good point about aggregation of child messages for a more
complex use case of 1:n relation between parent and children. Initially I
was thinking that aggregation will be just a separate operation that could
be added after we performed a foreign key join. Now I realize that it will
not be possible to do it after.

Maybe there could be a flag to stream-table foreign key join that would
indicate if we want this join to aggregate children or not?



What do you think?

Regards,



Igor


On Fri, Aug 4, 2023 at 10:01 PM Matthias J. Sax  wrote:


Thanks a lot for providing more background. It's getting much clear to
me now.

Couple of follow up questions:



It is not possible to use table-table join in this case because

triggering

events are supplied separately from the actual data entity that needs to

be

"assembled" and these events could only be presented as KStream due to
their nature.


Not sure if I understand this part? Why can't those events not
represented as a KTable. You say "could only be presented as KStream due
to their nature" -- what do you mean by this?

In the end, my understanding is the following (using the example for the
KIP):

For the shipments <-> orders and order-details <-> orders join, shipment
and order-details are the fact table, what is "reverse" to what you
want? Using existing FK join, it would mean you get two enriched tables,
that you cannot join to each other any further (because we don't support
n:m join): in the end, shipmentId+orderDetailId would be the PK of such
a n:m join?

If that's correct, (just for the purpose to make sure I understand
correctly), if we would add an n:m join, you could join shipment <->
order-details first, and use a FK join to enrich the result with orders.
-- In addition, you could also do a FK join to event if you represent
events as a table (this relates to my question from above, why events
cannot be represented as a KTable).


A the KIP itself, I am still wondering about details: if we get an event
in, and we do a lookup into the "FK table" and find multiple matches,
would we emit multiple results? This would kinda defeat the purpose to
re-assemble everything into a single entity? (And it might require an
additional aggregation downstream to put the entity together.) -- Or
would we join the singe event, with all found table rows, and emit a
single "enriched" event?


Thus, I am actually wondering, if you would not pre-process both
shipment and order-details table, via `groupBy(orderId)` and assemble a
list (or similar) of alls shipments (or order-details) per order? If you
do this pre-processing, you can do a PK-PK (1:1) join with the orders
table, and also do a stream-table join to enrich your events will the
full order information?



-Matthias

On 7/26/23 7:13 AM, Igor Fomenko wrote:

Hello Matthias,

Thank you for this response. It provides the context for a good

discussion

related to the need for this new interface.

The use case I have in mind is not really a stream enrichment which

usually

implies that the event has a primary key to some external info and that
external info could be just looked up in some other data source.

The pattern this KIP proposes is more akin to the data entity assembly
pattern from the persistence layer so it is not purely integration

pattern

but rather a pattern that enables an event stream from persistence layer

of

a data source application. The main driver here is the ability to stream

a

data entity of any complexity (complexity in terms of the relational

model)

from an application database to some data consumers. The technical
precondition here is of course that data is already extracted from the
relational database with something like Change Data Capture (CDC) and
placed to Kafka topics. Also due to CDC limitations, each database table
that is related to the entity relational data model is extracted to the
separate Kafka topic.

So to answer you first question the entity that n

Re: [DISCUSS] KIP-962 Relax non-null key requirement in Kafka Streams

2023-08-09 Thread Matthias J. Sax

Thanks for the KIP.


left join KStream-GlobalTable: no longer drop left records with null-key and 
call KeyValueMapper with 'null' for left  key. The case where KeyValueMapper 
returns null is already handled in the current implementation.


Not sure if this is the right phrasing? In the end, even now, the stream 
input record key can be null (cf 
https://issues.apache.org/jira/browse/KAFKA-10277) -- a stream record is 
only dropped if the `KeyValueMapper` returns `null` (note that the 
key-extractor has no default implemenation but is a required argument) 
-- this KIP would relax this case for left-join.




In the pull request all relevant Javadocs will be updated with the information 
on how to keep the old behavior for a given operator / method.


I would remove this from the KIP -- I am also not sure if we should put 
it into the JavaDoc? -- I agree that it should go into the upgrade docs 
as well as "join section" in the docs: 
https://kafka.apache.org/35/documentation/streams/developer-guide/dsl-api.html#joining


We also have 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics 
that we should update.




I added a remark about the repartition of null-key records.


I think the way it's phrase is good. In the end, it's an optimization to 
drop records upstream (instead of piping them through the topic and drop 
the downstream), and thus we don't have to cover it in the KIP in 
detail. In general, for aggregations we can still apply the 
optimization, however, we need to be careful as we could also have two 
downstream operators with a shared repartition topic: for this case, we 
can only drop upstream if all downstream operator would drop null-key 
records anyway. We can cover details on the PR.




-Matthias



On 8/9/23 5:39 AM, Florin Akermann wrote:

Hi All,

I added a remark about the repartition of null-key records.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams#KIP962:RelaxnonnullkeyrequirementinKafkaStreams-Repartitionofnull-keyrecords

Can we relax this constraint for any kind of repartitioning or should it
only be relaxed in the context of left stream-table and left/outer
stream-stream joins?

Florin

On Mon, 7 Aug 2023 at 13:23, Florin Akermann 
wrote:


Hi Lucas,

Thanks. I added the point about the upgrade guide as well.

Florin

On Mon, 7 Aug 2023 at 11:06, Lucas Brutschy 
wrote:


Hi Florin,

thanks for the KIP! This looks good to me. I agree that the precise
Java doc wording doesn't have to be discussed as part of the KIP.

I would also suggest to include an update to
https://kafka.apache.org/documentation/streams/upgrade-guide

Cheers,
Lucas

On Mon, Aug 7, 2023 at 10:51 AM Florin Akermann
 wrote:


Hi Both,

Thanks.
I added remarks to account for this.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams#KIP962:RelaxnonnullkeyrequirementinKafkaStreams-Remarks


In short, let's add a note in the Java docs? The exact wording of the

note

can be scrutinized in the pull request?

What do you think?


On Sun, 6 Aug 2023 at 19:41, Guozhang Wang 
wrote:


I'm just thinking we can try to encourage users to migrate from XX to
XXWithKey in the docs, giving this as one good example that the latter
can help you distinguish different scenarios whereas the former
cannot.

On Fri, Aug 4, 2023 at 6:32 PM Matthias J. Sax 

wrote:


Guozhang,

thanks for pointing out ValueJoinerWithKey. In the end, it's just a
documentation change, ie, point out that the passed in key could be
`null` and similar?

-Matthias


On 8/2/23 3:20 PM, Guozhang Wang wrote:

Thanks Florin for the writeup,

One quick thing I'd like to bring up is that in KIP-149
(



https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner

)

we introduced ValueJoinerWithKey which is aimed to enhance
ValueJoiner. It would have a benefit for this KIP such that
implementers can distinguish "null-key" v.s. "not-null-key but
null-value" scenarios.

Hence I'd suggest we also include the semantic changes with
ValueJoinerWithKey, which can help distinguish these two

scenarios,

and also document that if users apply ValueJoiner only, they may

not

have this benefit, and hence we suggest users to use the former.


Guozhang

On Mon, Jul 31, 2023 at 12:11 PM Florin Akermann
 wrote:






https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams










[jira] [Commented] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join

2023-08-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17752522#comment-17752522
 ] 

Matthias J. Sax commented on KAFKA-14049:
-

I am still not sure if I understand this ticket? – Or is it a duplicate of 
https://issues.apache.org/jira/browse/KAFKA-12317 ?

> Relax Non Null Requirement for KStreamGlobalKTable Left Join
> 
>
> Key: KAFKA-14049
> URL: https://issues.apache.org/jira/browse/KAFKA-14049
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Saumya Gupta
>Assignee: Florin Akermann
>Priority: Major
>
> Null Values in the Stream for a Left Join would indicate a Tombstone Message 
> that needs to propagated if not actually joined with the GlobalKTable 
> message, hence these messages should not be ignored .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14049:

Labels:   (was: kip)

> Relax Non Null Requirement for KStreamGlobalKTable Left Join
> 
>
> Key: KAFKA-14049
> URL: https://issues.apache.org/jira/browse/KAFKA-14049
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Saumya Gupta
>Assignee: Florin Akermann
>Priority: Major
>
> Null Values in the Stream for a Left Join would indicate a Tombstone Message 
> that needs to propagated if not actually joined with the GlobalKTable 
> message, hence these messages should not be ignored .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14049:

Labels: kip  (was: beginner newbie)

> Relax Non Null Requirement for KStreamGlobalKTable Left Join
> 
>
> Key: KAFKA-14049
> URL: https://issues.apache.org/jira/browse/KAFKA-14049
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Saumya Gupta
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
>
> Null Values in the Stream for a Left Join would indicate a Tombstone Message 
> that needs to propagated if not actually joined with the GlobalKTable 
> message, hence these messages should not be ignored .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14748) Relax non-null FK left-join requirement

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14748:

Description: 
Kafka Streams enforces a strict non-null-key policy in the DSL across all 
key-dependent operations (like aggregations and joins).

This also applies to FK-joins, in particular to the ForeignKeyExtractor. If it 
returns `null`, it's treated as invalid. For left-joins, it might make sense to 
still accept a `null`, and add the left-hand record with an empty 
right-hand-side to the result.

KIP-962: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams]
 

  was:
Kafka Streams enforces a strict non-null-key policy in the DSL across all 
key-dependent operations (like aggregations and joins).

This also applies to FK-joins, in particular to the ForeignKeyExtractor. If it 
returns `null`, it's treated as invalid. For left-joins, it might make sense to 
still accept a `null`, and add the left-hand record with an empty 
right-hand-side to the result.


> Relax non-null FK left-join requirement
> ---
>
> Key: KAFKA-14748
> URL: https://issues.apache.org/jira/browse/KAFKA-14748
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
>
> Kafka Streams enforces a strict non-null-key policy in the DSL across all 
> key-dependent operations (like aggregations and joins).
> This also applies to FK-joins, in particular to the ForeignKeyExtractor. If 
> it returns `null`, it's treated as invalid. For left-joins, it might make 
> sense to still accept a `null`, and add the left-hand record with an empty 
> right-hand-side to the result.
> KIP-962: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14748) Relax non-null FK left-join requirement

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14748:

Labels: kip  (was: )

> Relax non-null FK left-join requirement
> ---
>
> Key: KAFKA-14748
> URL: https://issues.apache.org/jira/browse/KAFKA-14748
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
>
> Kafka Streams enforces a strict non-null-key policy in the DSL across all 
> key-dependent operations (like aggregations and joins).
> This also applies to FK-joins, in particular to the ForeignKeyExtractor. If 
> it returns `null`, it's treated as invalid. For left-joins, it might make 
> sense to still accept a `null`, and add the left-hand record with an empty 
> right-hand-side to the result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12317:

Description: 
Currently, for a stream-streams and stream-table/globalTable join KafkaStreams 
drops all stream records with a `null`{-}key (`null`-join-key for 
stream-globalTable), because for a `null`{-}(join)key the join is undefined: 
ie, we don't have an attribute the do the table lookup (we consider the 
stream-record as malformed). Note, that we define the semantics of _left/outer_ 
join as: keep the stream record if no matching join record was found.

We could relax the definition of _left_ stream-table/globalTable and 
_left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
records, and call the ValueJoiner with a `null` "other-side" value instead: if 
the stream record key (or join-key) is `null`, we could treat is as "failed 
lookup" instead of treating the stream record as corrupted.

If we make this change, users that want to keep the current behavior, can add a 
`filter()` before the join to drop `null`-(join)key records from the stream 
explicitly.

Note that this change also requires to change the behavior if we insert a 
repartition topic before the join: currently, we drop `null`-key record before 
writing into the repartition topic (as we know they would be dropped later 
anyway). We need to relax this behavior for a left stream-table and left/outer 
stream-stream join. User need to be aware (ie, we might need to put this into 
the docs and JavaDocs), that records with `null`-key would be partitioned 
randomly.

KIP-962: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams]
 

  was:
Currently, for a stream-streams and stream-table/globalTable join KafkaStreams 
drops all stream records with a `null`-key (`null`-join-key for 
stream-globalTable), because for a `null`-(join)key the join is undefined: ie, 
we don't have an attribute the do the table lookup (we consider the 
stream-record as malformed). Note, that we define the semantics of _left/outer_ 
join as: keep the stream record if no matching join record was found.

We could relax the definition of _left_ stream-table/globalTable and 
_left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
records, and call the ValueJoiner with a `null` "other-side" value instead: if 
the stream record key (or join-key) is `null`, we could treat is as "failed 
lookup" instead of treating the stream record as corrupted.

If we make this change, users that want to keep the current behavior, can add a 
`filter()` before the join to drop `null`-(join)key records from the stream 
explicitly.

Note that this change also requires to change the behavior if we insert a 
repartition topic before the join: currently, we drop `null`-key record before 
writing into the repartition topic (as we know they would be dropped later 
anyway). We need to relax this behavior for a left stream-table and left/outer 
stream-stream join. User need to be aware (ie, we might need to put this into 
the docs and JavaDocs), that records with `null`-key would be partitioned 
randomly.


> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>      Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`{-}key (`null`-join-key 
> for stream-globalTable), because for a `null`{-}(join)key the join is 
> undefined: ie, we don't have an attribute the do the table lookup (we 
> consider the stream-record as malformed). Note, that we define the semantics 
> of _left/outer_ join as: keep the stream record if no matching join record 
> was found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartiti

[jira] [Updated] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12317:

Labels: kip  (was: )

> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`-key (`null`-join-key for 
> stream-globalTable), because for a `null`-(join)key the join is undefined: 
> ie, we don't have an attribute the do the table lookup (we consider the 
> stream-record as malformed). Note, that we define the semantics of 
> _left/outer_ join as: keep the stream record if no matching join record was 
> found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyway). We need to relax this behavior for a left stream-table and 
> left/outer stream-stream join. User need to be aware (ie, we might need to 
> put this into the docs and JavaDocs), that records with `null`-key would be 
> partitioned randomly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2023-08-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17752519#comment-17752519
 ] 

Matthias J. Sax commented on KAFKA-12317:
-

Just catching on of comments...
{quote}KAFKA-12845 is in status resolved so I assume this one is no longer 
relevant.
{quote}
Yes, it was closes as a duplicate of this ticket. Ie, we should make the change 
not just for left stream-table, but also for left stream-globalKTable.

Thanks [~guozhang]; I also prefer a KIP.

Thanks [~aki] for the KIP and PR.

> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`-key (`null`-join-key for 
> stream-globalTable), because for a `null`-(join)key the join is undefined: 
> ie, we don't have an attribute the do the table lookup (we consider the 
> stream-record as malformed). Note, that we define the semantics of 
> _left/outer_ join as: keep the stream record if no matching join record was 
> found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyway). We need to relax this behavior for a left stream-table and 
> left/outer stream-stream join. User need to be aware (ie, we might need to 
> put this into the docs and JavaDocs), that records with `null`-key would be 
> partitioned randomly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15319:

Priority: Critical  (was: Major)

> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Priority: Critical
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 
> https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15319:

Component/s: streams

> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Priority: Major
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 
> https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order

2023-08-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17752150#comment-17752150
 ] 

Matthias J. Sax commented on KAFKA-15297:
-

Seems we are on the same page :) 

> Cache flush order might not be topological order 
> -
>
> Key: KAFKA-15297
> URL: https://issues.apache.org/jira/browse/KAFKA-15297
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Bruno Cadonna
>Priority: Major
> Attachments: minimal_example.png
>
>
> The flush order of the state store caches in Kafka Streams might not 
> correspond to the topological order of the state stores in the topology. The 
> order depends on how the processors and state stores are added to the 
> topology. 
> In some cases downstream state stores might be flushed before upstream state 
> stores. That means, that during a commit records in upstream caches might end 
> up in downstream caches that have already been flushed during the same 
> commit. If a crash happens at that point, those records in the downstream 
> caches are lost. Those records are lost for two reasons:
> 1. Records in caches are only changelogged after they are flushed from the 
> cache. However, the downstream caches have already been flushed and they will 
> not be flushed again during the same commit.
> 2. The offsets of the input records that caused the records that now are 
> blocked in the downstream caches are committed during the same commit and so 
> they will not be re-processed after the crash.
> An example for a topology where the flush order of the caches is wrong is the 
> following:
> {code:java}
> final String inputTopic1 = "inputTopic1";
> final String inputTopic2 = "inputTopic2";
> final String outputTopic1 = "outputTopic1";
> final String processorName = "processor1";
> final String stateStoreA = "stateStoreA";
> final String stateStoreB = "stateStoreB";
> final String stateStoreC = "stateStoreC";
> streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), 
> Serdes.String()))
> .process(
> () -> new Processor() {
> private ProcessorContext context;
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> }
> @Override
> public void process(Record record) {
> context.forward(record);
> }
> @Override
> public void close() {}
> },
> Named.as("processor1")
> )
> .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
> streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), 
> Serdes.String()))
> .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .toStream()
> .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
> final Topology topology = streamsBuilder.build(streamsConfiguration);
> topology.connectProcessorAndStateStores(processorName, stateStoreC);
> {code}
> This code results in the attached topology.
> In the topology {{processor1}} is connected to {{stateStoreC}}. If 
> {{processor1}} is added to the topology before the other processors, i.e., if 
> the right branch of the topology is added before the left branch as in the 
> code above, the cache of {{stateStoreC}} is flushed before the caches of 
> {{stateStoreA}} and {{stateStoreB}}.
> You can observe the flush order by feeding some records into the input topics 
> of the topology, waiting for a commit,  and looking for the following log 
> message:
> https://github.com/apache/kafka/blob/2e1947d240607d53f071f61c875cfffc3fec47fe/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L513
>  
> I changed the log message from trace to debug to avoid too much noise. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once

2023-08-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17752149#comment-17752149
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

The only thing I am wondering is, if we should try to fix it right away in KS 
by adding an internal producer config for now until K15309 get done? – In the 
end it seems that a KS app could get stuck without the ability to skip over a 
"poison pill write", so it might be worth fixing right away (even if the fix is 
more of an hack for now)?

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using exactly_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using exactly_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> exactly_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using exactly_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with exactly_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using exactly_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of exactly_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of this error but 
> written offsets would not be recorded.
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576

[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751836#comment-17751836
 ] 

Matthias J. Sax commented on KAFKA-15302:
-

{quote}it's a combo of a cache snapshot and the underlying RocksDB's
{quote}
I think this is actually the bug: we actually don't return a snapshot over the 
cache. If it would be a snapshot, flushing the cache would not modify it, but 
it does as you pointed out:
{quote}hat modifying the store for `keyA` might change the content of `KeyB` 
compared with the snapshot
{quote}
I am totally in favor of "decouple flushing with forwarding" thought, 
independent of this ticket. But that's a larger piece of work anyway, and I am 
also not sure if we would want to make such a change in-place, or via DSL 2.0?
{quote}letting any range queries to flush cache first, and then only return 
from the underlying store.
{quote}
That's an interesting idea. – A naive fix would be, to actually make a "shallow 
copy (with copy on write)" of the named cached when opening an iterator, to 
guard the shallow copy for evection. But this is also hard to get right and 
could be very memory intensive...

In case we cannot provide a fix easily, updating the docs is for sure something 
we should do.

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84

[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751835#comment-17751835
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

Ah. Sorry. Meant https://issues.apache.org/jira/browse/KAFKA-15309 (sorry for 
the c error – fixed in the above two comments).
{quote}KS could tell KP to not transit to errors for any `abortable` error type 
but just ignore and continue?
{quote}
That's not so easy. Currently, the producer goes into ERROR state, so we need 
to actually close the producer and would need to create a new one, including 
falling back to last committed offset and retry the failed transactions. If the 
error is "deterministic" (like a `RercordTooLargeException`) we would just hit 
it again on re-try, and would need to do more book keeping to actually avoid 
hitting the error again. That's why it seems simpler to actually skip the error 
inside the producer (-> https://issues.apache.org/jira/browse/KAFKA-15309) to 
avoid that the producer goes into ERROR state to begin with.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
&g

[jira] [Comment Edited] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751759#comment-17751759
 ] 

Matthias J. Sax edited comment on KAFKA-15259 at 8/7/23 11:59 PM:
--

{quote}Should we create a new ticket for Kafka Producer?
{quote}
As mentioned above, I created https://issues.apache.org/jira/browse/KAFKA-15309 
for the producer already. If it's not covering what you think we need, just 
leave a comment on the ticket.


was (Author: mjsax):
{quote}Should we create a new ticket for Kafka Producer?
{quote}
As mentioned above, I created 
-https://issues.apache.org/jira/browse/KAFKA-15259- 
https://issues.apache.org/jira/browse/KAFKA-15309 for the producer already. If 
it's not covering what you think we need, just leave a comment on the ticket.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception 

[jira] [Comment Edited] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751759#comment-17751759
 ] 

Matthias J. Sax edited comment on KAFKA-15259 at 8/7/23 11:58 PM:
--

{quote}Should we create a new ticket for Kafka Producer?
{quote}
As mentioned above, I created 
-https://issues.apache.org/jira/browse/KAFKA-15259- 
https://issues.apache.org/jira/browse/KAFKA-15309 for the producer already. If 
it's not covering what you think we need, just leave a comment on the ticket.


was (Author: mjsax):
{quote}Should we create a new ticket for Kafka Producer?
{quote}
As mentioned above, I created https://issues.apache.org/jira/browse/KAFKA-15259 
for the producer already. If it's not covering what you think we need, just 
leave a comment on the ticket.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception 

[jira] [Comment Edited] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751267#comment-17751267
 ] 

Matthias J. Sax edited comment on KAFKA-15259 at 8/7/23 11:58 PM:
--

I did sync up with [~cegerton] who worked on 
https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this 
idea: adding a "production exception handler" to the producer that would allow 
KS to tell the producer to not fail the TX but skip the record: 
https://issues.apache.org/jira/browse/KAFKA-15309.

If we cannot do K15259, an alternative might be, to add an internal producer 
config that allow Kafka Streams to disable the pro-active abort of a TX. This 
would be safe, because Kafka Streams is actually a good citizen and calls 
`producer.flush()` and evaluates all callbacks before trying to commit – the 
issue K9279 addresses is actually bad user behavior to not check for async 
errors before committing.


was (Author: mjsax):
I did sync up with [~cegerton] who worked on 
https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this 
idea: adding a "production exception handler" to the producer that would allow 
KS to tell the producer to not fail the TX but skip the record: 
-https://issues.apache.org/jira/browse/KAFKA-15259- 
https://issues.apache.org/jira/browse/KAFKA-15309.

If we cannot do K15259, an alternative might be, to add an internal producer 
config that allow Kafka Streams to disable the pro-active abort of a TX. This 
would be safe, because Kafka Streams is actually a good citizen and calls 
`producer.flush()` and evaluates all callbacks before trying to commit – the 
issue K9279 addresses is actually bad user behavior to not check for async 
errors before committing.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and 

[jira] [Comment Edited] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751267#comment-17751267
 ] 

Matthias J. Sax edited comment on KAFKA-15259 at 8/7/23 11:58 PM:
--

I did sync up with [~cegerton] who worked on 
https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this 
idea: adding a "production exception handler" to the producer that would allow 
KS to tell the producer to not fail the TX but skip the record: 
-https://issues.apache.org/jira/browse/KAFKA-15259- 
https://issues.apache.org/jira/browse/KAFKA-15309.

If we cannot do K15259, an alternative might be, to add an internal producer 
config that allow Kafka Streams to disable the pro-active abort of a TX. This 
would be safe, because Kafka Streams is actually a good citizen and calls 
`producer.flush()` and evaluates all callbacks before trying to commit – the 
issue K9279 addresses is actually bad user behavior to not check for async 
errors before committing.


was (Author: mjsax):
I did sync up with [~cegerton] who worked on 
https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this 
idea: adding a "production exception handler" to the producer that would allow 
KS to tell the producer to not fail the TX but skip the record: 
https://issues.apache.org/jira/browse/KAFKA-15259 

If we cannot do K15259, an alternative might be, to add an internal producer 
config that allow Kafka Streams to disable the pro-active abort of a TX. This 
would be safe, because Kafka Streams is actually a good citizen and calls 
`producer.flush()` and evaluates all callbacks before trying to commit – the 
issue K9279 addresses is actually bad user behavior to not check for async 
errors before committing.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and 

[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751761#comment-17751761
 ] 

Matthias J. Sax commented on KAFKA-15302:
-

Thanks for providing more details; the `delete()` call that interleaved with 
calls to the all-iterator should be the root cause; the delete call was missing 
in the original description. So we now know where the flush/evict is coming 
from. Thanks for confirming!

Still not sure how to fix it though right now. [~guozhang] any ideas?

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator 
> all()
> 01:05:00.588 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
> 01:05:00.590 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-S

[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751759#comment-17751759
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

{quote}Should we create a new ticket for Kafka Producer?
{quote}
As mentioned above, I created https://issues.apache.org/jira/browse/KAFKA-15259 
for the producer already. If it's not covering what you think we need, just 
leave a comment on the ticket.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of this error but 
> written offsets would not be recorded.
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 INFO  TransactionManager:393 - [Producer 
&

[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751756#comment-17751756
 ] 

Matthias J. Sax commented on KAFKA-14747:
-

We can discuss here on the ticket I guess. The class in question is 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java#L114]

We already added a `TRACE` log for this case recently.

For using the dropped record sensor, cf. 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java#L66-L70]
 that does the same thing.

Let me know if this helps to get you bootstrapped.

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Koma Zhang
>Priority: Minor
>  Labels: beginner, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [QUESTION] What is the difference between sequence and offset for a Record?

2023-08-07 Thread Matthias J. Sax

but the base offset may change during log normalizing.


Not sure what you mean by "normalization" but offsets are immutable, so 
they don't change. (To be fair, I am not an expert on brokers, so not 
sure how this work in detail when log compaction ticks in).



This field is given by the producer and the broker should only read it.


Sounds right. The point being is, that the broker has an "expected" 
value for it, and if the provided value does not match the expected one, 
the write is rejected to begin with.



-Matthias

On 8/7/23 6:35 AM, tison wrote:

Hi Matthias and Justine,

Thanks for your reply!

I can summarize the answer as -

Record offset = base offset + offset delta. This field is calculated by the
broker and the delta won't change but the base offset may change during log
normalizing.
Record sequence = base sequence + (offset) delta. This field is given by
the producer and the broker should only read it.

Is it correct?

I implement the manipulation part of base offset following this
understanding at [1].

Best,
tison.

[1]
https://github.com/tisonkun/kafka-api/blob/d080ab7e4b57c0ab0182e0b254333f400e616cd2/simplesrv/src/lib.rs#L391-L394


Justine Olshan  于2023年8月2日周三 04:19写道:


For what it's worth -- the sequence number is not calculated
"baseOffset/baseSequence + offset delta" but rather by monotonically
increasing for a given epoch. If the epoch is bumped, we reset back to
zero.
This may mean that the offset and sequence may match, but do not strictly
need to be the same. The sequence number will also always come from the
client and be in the produce records sent to the Kafka broker.

As for offsets, there is some code in the log layer that maintains the log
end offset and assigns offsets to the records. The produce handling on the
leader should typically assign the offset.
I believe you can find that code here:

https://github.com/apache/kafka/blob/b9a45546a7918799b6fb3c0fe63b56f47d8fcba9/core/src/main/scala/kafka/log/UnifiedLog.scala#L766

Justine

On Tue, Aug 1, 2023 at 11:38 AM Matthias J. Sax  wrote:


The _offset_ is the position of the record in the partition.

The _sequence number_ is a unique ID that allows broker to de-duplicate
messages. It requires the producer to implement the idempotency protocol
(part of Kafka transactions); thus, sequence numbers are optional and as
long as you don't want to support idempotent writes, you don't need to
worry about them. (If you want to dig into details, checkout KIP-98 that
is the original KIP about Kafka TX).

HTH,
-Matthias

On 8/1/23 2:19 AM, tison wrote:

Hi,

I'm wringing a Kafka API Rust codec library[1] to understand how Kafka
models its concepts and how the core business logic works.

During implementing the codec for Records[2], I saw a twins of fields
"sequence" and "offset". Both of them are calculated by
baseOffset/baseSequence + offset delta. Then I'm a bit confused how to

deal

with them properly - what's the difference between these two concepts
logically?

Also, to understand how the core business logic works, I write a simple
server based on my codec library, and observe that the server may need

to

update offset for records produced. How does Kafka set the correct

offset

for each produced records? And how does Kafka maintain the calculation

for

offset and sequence during these modifications?

I'll appreciate if anyone can answer the question or give some insights

:D


Best,
tison.

[1] https://github.com/tisonkun/kafka-api
[2] https://kafka.apache.org/documentation/#messageformat









[jira] [Assigned] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema

2023-08-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15303:
---

Assignee: Matthias J. Sax

> Foreign key joins no longer triggered by events on the right side of the join 
> after deployment with a new compatible Avro schema
> 
>
> Key: KAFKA-15303
> URL: https://issues.apache.org/jira/browse/KAFKA-15303
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Charles-Eddy
>    Assignee: Matthias J. Sax
>Priority: Major
> Attachments: image (1).png
>
>
> Hello everyone, I am currently working on a project that uses Kafka Streams 
> (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
> Our goal is to join offer information from our sellers with additional data 
> from various input topics, and then feed the resulting joined information 
> into an output topic.
> Our application is deployed in Kubernetes using the StatefulSet feature, with 
> one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.
> We are using avro to serialize / deserialize input topics and storing in the 
> state stores of Kafka streams.
> We have encountered a bug in Kafka Streams that prevents us from deploying 
> new versions of Kafka Streams containing new compatible Avro schemas of our 
> input topics.
> The symptom is that after deploying our new version, which contains no 
> changes in topology but only changes to the Avro schema used, we discard 
> every event coming from the right part of the join concerned by these Avro 
> schema changes until we receive something from the left part of the join.
> As a result, we are losing events and corrupting our output topics and stores 
> with outdated information.
> After checking the local help for the priority to assign, I have assigned it 
> as *CRITICAL* because we are losing data (for example, tombstones are not 
> propagated to the following joins, so some products are still visible on our 
> website when they should not be).
> Please feel free to change the priority if you think it is not appropriate.
>  
> *The bug:*
> After multiple hours of investigation we found out that the bug is located in 
> the foreign key join feature and specifically in this class: 
> *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
> join. 
> This class and his method process(...) is computing a hash from the local 
> store via a serialization of a deserialized value from the left state store 
> and comparing it with the hash of the original message from the 
> subscription-response-topic. 
>  
> It means that when we deploy a new version of our kafka streams instance with 
> a new compatible avro schema from the left side of a join, every join 
> triggered by the right part of the join are invalidated until we receive all 
> the events again on the left side. Every join triggered by the right part of 
> the join are discarded because all the hashes computed by kafka streams are 
> different now from the original messages.
>  
> *How to reproduce it:*
> If we take a working and a non-working workflow, it will do something like 
> this:
> +Normal non-breaking+ workflow from the left part of the FK join:
>  # A new offer event occurs. The offer is received and stored (v1).
>  # A subscription registration is sent with the offer-hash (v1).
>  # The subscription is saved to the store with the v1 offer-hash.
>  # Product data is searched for.
>  # If product data is found, a subscription response is sent back, including 
> the v1 offer-hash.
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
> New product event from the right part of the FK join:
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
>  
> +Breaking workflow:+ 
> The offer serializer is changed to offer v2
> New product event from the right part of the FK join: 
>  # The product is received and stored.
>  # All related offers in the registration store are searched fo

[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15309:

Description: 
The producer collects multiple records into batches, and a single record 
specific error might fail the whole batch (eg, `RecordTooLargeException`).

This ticket suggests to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.

Another example for which a production exception handler could be useful, if a 
user tries to write into a non-existing topic, which returns a retryable error 
code; with infinite retries the producer would hang retrying forever. A handler 
could help to break the infinite retry loop.

  was:
The producer collects multiple records into batches, and a single record 
specific error might fail the whole batch (eg, `RecordTooLargeException`).

This ticket suggests to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.

Another example for which a production exception handler would be useful, if a 
user tries to write into a non-existing topic, which returns a retryable error 
code; with infinite retries the producer would hang retrying forever.


> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggests to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.
> Another example for which a production exception handler could be useful, if 
> a user tries to write into a non-existing topic, which returns a retryable 
> error code; with infinite retries the producer would hang retrying forever. A 
> handler could help to break the infinite retry loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15309:

Description: 
The producer collects multiple records into batches, and a single record 
specific error might fail the whole batch (eg, `RecordTooLargeException`).

This ticket suggests to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.

Another example for which a production exception handler would be useful, if a 
user tries to write into a non-existing topic, which returns a retryable error 
code; with infinite retries the producer would hang retrying forever.

  was:
The producer collects multiple records into batches, and a single record 
specific error might fail the whole batch (eg, `RecordTooLargeException`).

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.


> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggests to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.
> Another example for which a production exception handler would be useful, if 
> a user tries to write into a non-existing topic, which returns a retryable 
> error code; with infinite retries the producer would hang retrying forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15309:

Description: 
The producer collects multiple records into batches, and a single record 
specific error might fail the whole batch (eg, `RecordTooLargeException`).

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.

  was:
The producer batches up multiple records into batches, and a single record 
specific error might fail the whole batch.

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.


> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggest to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15309:

Description: 
The producer batches up multiple records into batches, and a single record 
specific error might fail the whole batch.

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.

  was:
The producer batches up multiple records into batches, and a single record 
specific error might fail the whole batch.

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`.


> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> The producer batches up multiple records into batches, and a single record 
> specific error might fail the whole batch.
> This ticket suggest to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751267#comment-17751267
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

I did sync up with [~cegerton] who worked on 
https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this 
idea: adding a "production exception handler" to the producer that would allow 
KS to tell the producer to not fail the TX but skip the record: 
https://issues.apache.org/jira/browse/KAFKA-15259 

If we cannot do K15259, an alternative might be, to add an internal producer 
config that allow Kafka Streams to disable the pro-active abort of a TX. This 
would be safe, because Kafka Streams is actually a good citizen and calls 
`producer.flush()` and evaluates all callbacks before trying to commit – the 
issue K9279 addresses is actually bad user behavior to not check for async 
errors before committing.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which i

[jira] [Created] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15309:
---

 Summary: Add custom error handler to Producer
 Key: KAFKA-15309
 URL: https://issues.apache.org/jira/browse/KAFKA-15309
 Project: Kafka
  Issue Type: New Feature
  Components: producer 
Reporter: Matthias J. Sax


The producer batches up multiple records into batches, and a single record 
specific error might fail the whole batch.

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15309:
---

 Summary: Add custom error handler to Producer
 Key: KAFKA-15309
 URL: https://issues.apache.org/jira/browse/KAFKA-15309
 Project: Kafka
  Issue Type: New Feature
  Components: producer 
Reporter: Matthias J. Sax


The producer batches up multiple records into batches, and a single record 
specific error might fail the whole batch.

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Testing FixedKeyProcessor implementation using unit tests

2023-08-04 Thread Matthias J. Sax
Thanks for filing a ticket for it: 
https://issues.apache.org/jira/browse/KAFKA-15242




On 7/14/23 1:06 AM, EXT.Zlatibor.Veljkovic wrote:

Hi Matthias,

Here's the repro of the project that has these issues 
https://github.com/zveljkovic/kafka-repro.

Please look at the:
Topology definition: 
https://github.com/zveljkovic/kafka-repro/blob/master/src/main/java/com/example/demo/DemoApplication.java
FixedKeyProcessor: 
https://github.com/zveljkovic/kafka-repro/blob/master/src/main/java/com/example/demo/MyFixedKeyProcessor.java
Test of FixedKeyProcessor: 
https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java

Test is where I am having issues.

Thanks,
Zed


-Original Message-
From: Matthias J. Sax 
Sent: Tuesday, July 11, 2023 1:13 AM
To: dev@kafka.apache.org
Subject: Re: Testing FixedKeyProcessor implementation using unit tests

External email:Be careful with links and attachments


Not sure right now, but could be a bug.

Can you maybe share the full stack trace and the test program?

-Matthias

On 7/10/23 3:47 AM, EXT.Zlatibor.Veljkovic wrote:

Hi, I am using kafka-streams-test-utils and have problem with testing 
FixedKeyProcessor [KIP-820 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API#KIP820:ExtendKStreamprocesswithnewProcessorAPI-InfrastructureforFixedKeyRecords].

Using mock processor context to get the forwarded message doesn't work.

class org.apache.kafka.streams.processor.api.MockProcessorContext cannot be 
cast to class org.apache.kafka.streams.processor.api.FixedKeyProcessorContext

Anything I can do to get forwarded records?

Thanks,
Zed



[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751263#comment-17751263
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

Thanks for digging into it – that's a good fine – I was already wondering, 
because I could not see any code changes between 3.1 and 3.2 in Kafka Streams 
that would explain it.

And yes, if the producer goes into error-state, it is impossible for KS to 
"revert" it – thus, I am not sure right now how we could fix it... In the end, 
Kafka Streams does `producer.flush()` and evaluates if there are any errors, 
detect the `RecordTooLargeException`, executed the handler which returns 
`CONTINUE` what is respected. If it would not be respected, the 
`RecordTooLargeException` would be re-thrown right away. But because Kafka 
Streams does `CONTINUE` it actually tries to commit, but cannot because the 
producer is already in error state.

I high level idea would be, trying to remember the input record offset, and 
after we failed and the task is restarted, has an implicit filter that drops 
the input message right away based on the offset we did remember. But such a 
thing would needs to get very careful design...

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [jav

[jira] [Updated] (KAFKA-15308) Wipe Stores upon OffsetOutOfRangeException in ALOS

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15308:

Affects Version/s: (was: 3.4.0)
   (was: 3.5.0)

> Wipe Stores upon OffsetOutOfRangeException in ALOS
> --
>
> Key: KAFKA-15308
> URL: https://issues.apache.org/jira/browse/KAFKA-15308
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0
>Reporter: Colt McNealy
>Priority: Minor
>
> As per this [Confluent Community Slack 
> Thread|https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1690843733272449?thread_ts=1690663361.858559=C48AHTCUQ],
>  Streams currently does not wipe away RocksDB state upon encountering an 
> `OffsetOutOfRangeException` in ALOS.
>  
> `OffsetOutOfRangeException` is a rare case that occurs when a standby task 
> requests offsets that no longer exist in the topic. We should wipe the store 
> for three reasons:
>  # Not wiping the store can be a violation of ALOS since some of the 
> now-missing offsets could have contained tombstone records.
>  # Wiping the store has no performance cost since we need to replay the 
> entirety of what's in the changelog topic anyways.
>  # I have heard (not yet confirmed myself) that we wipe the store in EOS 
> anyways, so fixing this bug could remove a bit of complexity from supporting 
> EOS and ALOS.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-08-04 Thread Matthias J. Sax
owever
in many cases for various architecture reasons there is a desire to remove
JDBC queries from the data source and replace it with CDC streaming data to
Kafka. So in that case assembling data entities from Kafka topics instead
of JDBC would be beneficial.

Please let me know what you think.

Regards,

Igor

On Tue, Jul 25, 2023 at 5:53 PM Matthias J. Sax  wrote:


Igor,

thanks for the KIP. Interesting proposal. I am wondering a little bit
about the use-case and semantics, and if it's really required to add
what you propose? Please correct me if I am wrong.

In the end, a stream-table join is a "stream enrichment" (via a table
lookup). Thus, it's inherently a 1:1 join (in contrast to a FK
table-table join which is a n:1 join).

If this assumption is correct, and you have data for which the table
side join attribute is in the value, you could actually repartition the
table data using the join attribute as the PK of the table.

If my assumption is incorrect, and you say you want to have a 1:n join
(note that I intentionally reversed from n:1 to 1:n), I would rather
object, because it seems to violate the idea to "enrich" a stream, what
means that each input record produced an output record, not multiple?

Also note: for a FK table-table join, we use the forgeinKeyExtractor to
get the join attribute from the left input table (which corresponds to
the KStream in your KIP; ie, it's a n:1 join), while you propose to use
the foreignKeyExtractor to be applied to the KTable (which is the right
input, and thus it would be a 1:n join).

Maybe you can clarify the use case a little bit. For the current KIP
description I only see the 1:1 join case, what would mean we might not
need such a feature?


-Matthias


On 7/24/23 11:36 AM, Igor Fomenko wrote:

Hello developers of the Kafka Streams,

I would like to start discussion on KIP-955: Add stream-table join on
foreign key
<

https://cwiki.apache.org/confluence/display/KAFKA/KIP-955%3A+Add+stream-table+join+on+foreign+key


This KIP proposes the new API to join KStrem with KTable based on foreign
key relation.
Ths KIP was inspired by one of my former projects to integrate RDBMS
databases with data consumers using Change Data Capture and Kafka.
If we had the capability in Kafka Stream to join KStream with KTable on
foreign key this would simplify our implementation significantly.

Looking forward to your feedback and discussion.

Regards,

Igor







Re: [DISCUSS] KIP-962 Relax non-null key requirement in Kafka Streams

2023-08-04 Thread Matthias J. Sax

Guozhang,

thanks for pointing out ValueJoinerWithKey. In the end, it's just a 
documentation change, ie, point out that the passed in key could be 
`null` and similar?


-Matthias


On 8/2/23 3:20 PM, Guozhang Wang wrote:

Thanks Florin for the writeup,

One quick thing I'd like to bring up is that in KIP-149
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner)
we introduced ValueJoinerWithKey which is aimed to enhance
ValueJoiner. It would have a benefit for this KIP such that
implementers can distinguish "null-key" v.s. "not-null-key but
null-value" scenarios.

Hence I'd suggest we also include the semantic changes with
ValueJoinerWithKey, which can help distinguish these two scenarios,
and also document that if users apply ValueJoiner only, they may not
have this benefit, and hence we suggest users to use the former.


Guozhang

On Mon, Jul 31, 2023 at 12:11 PM Florin Akermann
 wrote:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams


[jira] [Created] (KAFKA-15307) Kafka Streams configuration docs outdate

2023-08-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15307:
---

 Summary: Kafka Streams configuration docs outdate
 Key: KAFKA-15307
 URL: https://issues.apache.org/jira/browse/KAFKA-15307
 Project: Kafka
  Issue Type: Task
  Components: docs, streams
Reporter: Matthias J. Sax


[https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html]
 need to be updated.

It's missing a lot of newly added config, and still lists already removed 
configs.

For deprecated configs, we could consider to also remove them, or add a 
"deprecated config" section and keep the for the time being.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15307) Kafka Streams configuration docs outdate

2023-08-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15307:
---

 Summary: Kafka Streams configuration docs outdate
 Key: KAFKA-15307
 URL: https://issues.apache.org/jira/browse/KAFKA-15307
 Project: Kafka
  Issue Type: Task
  Components: docs, streams
Reporter: Matthias J. Sax


[https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html]
 need to be updated.

It's missing a lot of newly added config, and still lists already removed 
configs.

For deprecated configs, we could consider to also remove them, or add a 
"deprecated config" section and keep the for the time being.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751236#comment-17751236
 ] 

Matthias J. Sax commented on KAFKA-15302:
-

Thanks for reporting this issue. – When you call `store.all()` you get a 
iterator back that is build over the cache as well as RocksDB. For the 
underlying RocksDB iterator, it provides an immutable snapshot, thus any later 
writes into RocksDB are not visible to the iterator. Thus, if the cache is 
flushed, and we try to read the key from the cache and cannot find it, we go to 
the underlying RocksDB iterator which cannot see the write. This should explain 
it.

What I am wondering though right now is, why the cache would get flushed to 
begin with? – There should not be an explicit `store.flush()` call because we 
only flush before a `commit()` what happens on the same thread; we might also 
`evict()` during a `put()` if the cache overflows, but there is no `put()` call 
in between; the third case I could find is, when a new `StreamThread` is added 
and we need to resize the cache (this would indeed be an concurrent operation; 
could adding/removing a thread explain what you observe?

Otherwise we would need to do more digging while the cache is flushed begin 
with? If we flush incorrectly and can avoid the flush we should be able to fix 
it. If we flush correctly, we might need to have a guard inside the caching 
layer itself and suppress the flush if there is an open iterator (what does 
actually not sound like a great solution, but maybe it would be the correct way 
forward.)

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=63

[jira] [Commented] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema

2023-08-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751212#comment-17751212
 ] 

Matthias J. Sax commented on KAFKA-15303:
-

Thanks for filing this ticket.

Kafka Streams as only limited support for schema evolution, because Kafka 
Streams is schema agnostic – the runtime does not even know what format is 
used, and thus cannot reason about it. I updated the ticket as "improvement" 
because it's not a bug: the system works as designed.

In the end, Kafka Streams uses whatever Serde you provide, so it's not clear if 
we even could fix it on our end? Maybe you could put some hack into the Serde 
you provide to fix it? It's unfortunately not possible atm to get the original 
raw bytes right now (that would allow so avoid the re-serialization to begin 
with).

> Foreign key joins no longer triggered by events on the right side of the join 
> after deployment with a new compatible Avro schema
> 
>
> Key: KAFKA-15303
> URL: https://issues.apache.org/jira/browse/KAFKA-15303
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Charles-Eddy
>Priority: Major
> Attachments: image (1).png
>
>
> Hello everyone, I am currently working on a project that uses Kafka Streams 
> (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
> Our goal is to join offer information from our sellers with additional data 
> from various input topics, and then feed the resulting joined information 
> into an output topic.
> Our application is deployed in Kubernetes using the StatefulSet feature, with 
> one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.
> We are using avro to serialize / deserialize input topics and storing in the 
> state stores of Kafka streams.
> We have encountered a bug in Kafka Streams that prevents us from deploying 
> new versions of Kafka Streams containing new compatible Avro schemas of our 
> input topics.
> The symptom is that after deploying our new version, which contains no 
> changes in topology but only changes to the Avro schema used, we discard 
> every event coming from the right part of the join concerned by these Avro 
> schema changes until we receive something from the left part of the join.
> As a result, we are losing events and corrupting our output topics and stores 
> with outdated information.
> After checking the local help for the priority to assign, I have assigned it 
> as *CRITICAL* because we are losing data (for example, tombstones are not 
> propagated to the following joins, so some products are still visible on our 
> website when they should not be).
> Please feel free to change the priority if you think it is not appropriate.
>  
> *The bug:*
> After multiple hours of investigation we found out that the bug is located in 
> the foreign key join feature and specifically in this class: 
> *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
> join. 
> This class and his method process(...) is computing a hash from the local 
> store via a serialization of a deserialized value from the left state store 
> and comparing it with the hash of the original message from the 
> subscription-response-topic. 
>  
> It means that when we deploy a new version of our kafka streams instance with 
> a new compatible avro schema from the left side of a join, every join 
> triggered by the right part of the join are invalidated until we receive all 
> the events again on the left side. Every join triggered by the right part of 
> the join are discarded because all the hashes computed by kafka streams are 
> different now from the original messages.
>  
> *How to reproduce it:*
> If we take a working and a non-working workflow, it will do something like 
> this:
> +Normal non-breaking+ workflow from the left part of the FK join:
>  # A new offer event occurs. The offer is received and stored (v1).
>  # A subscription registration is sent with the offer-hash (v1).
>  # The subscription is saved to the store with the v1 offer-hash.
>  # Product data is searched for.
>  # If product data is found, a subscription response is sent back, including 
> the v1 offer-hash.
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
> New product event from the right part of the FK join:
>  # The product is received and stor

[jira] [Updated] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15303:

Issue Type: Improvement  (was: Bug)

> Foreign key joins no longer triggered by events on the right side of the join 
> after deployment with a new compatible Avro schema
> 
>
> Key: KAFKA-15303
> URL: https://issues.apache.org/jira/browse/KAFKA-15303
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Charles-Eddy
>Priority: Critical
> Attachments: image (1).png
>
>
> Hello everyone, I am currently working on a project that uses Kafka Streams 
> (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
> Our goal is to join offer information from our sellers with additional data 
> from various input topics, and then feed the resulting joined information 
> into an output topic.
> Our application is deployed in Kubernetes using the StatefulSet feature, with 
> one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.
> We are using avro to serialize / deserialize input topics and storing in the 
> state stores of Kafka streams.
> We have encountered a bug in Kafka Streams that prevents us from deploying 
> new versions of Kafka Streams containing new compatible Avro schemas of our 
> input topics.
> The symptom is that after deploying our new version, which contains no 
> changes in topology but only changes to the Avro schema used, we discard 
> every event coming from the right part of the join concerned by these Avro 
> schema changes until we receive something from the left part of the join.
> As a result, we are losing events and corrupting our output topics and stores 
> with outdated information.
> After checking the local help for the priority to assign, I have assigned it 
> as *CRITICAL* because we are losing data (for example, tombstones are not 
> propagated to the following joins, so some products are still visible on our 
> website when they should not be).
> Please feel free to change the priority if you think it is not appropriate.
>  
> *The bug:*
> After multiple hours of investigation we found out that the bug is located in 
> the foreign key join feature and specifically in this class: 
> *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
> join. 
> This class and his method process(...) is computing a hash from the local 
> store via a serialization of a deserialized value from the left state store 
> and comparing it with the hash of the original message from the 
> subscription-response-topic. 
>  
> It means that when we deploy a new version of our kafka streams instance with 
> a new compatible avro schema from the left side of a join, every join 
> triggered by the right part of the join are invalidated until we receive all 
> the events again on the left side. Every join triggered by the right part of 
> the join are discarded because all the hashes computed by kafka streams are 
> different now from the original messages.
>  
> *How to reproduce it:*
> If we take a working and a non-working workflow, it will do something like 
> this:
> +Normal non-breaking+ workflow from the left part of the FK join:
>  # A new offer event occurs. The offer is received and stored (v1).
>  # A subscription registration is sent with the offer-hash (v1).
>  # The subscription is saved to the store with the v1 offer-hash.
>  # Product data is searched for.
>  # If product data is found, a subscription response is sent back, including 
> the v1 offer-hash.
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
> New product event from the right part of the FK join:
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
>  
> +Breaking workflow:+ 
> The offer serializer is changed to offer v2
> New product event from the right part of the FK join: 
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription resp

[jira] [Updated] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15303:

Priority: Major  (was: Critical)

> Foreign key joins no longer triggered by events on the right side of the join 
> after deployment with a new compatible Avro schema
> 
>
> Key: KAFKA-15303
> URL: https://issues.apache.org/jira/browse/KAFKA-15303
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Charles-Eddy
>Priority: Major
> Attachments: image (1).png
>
>
> Hello everyone, I am currently working on a project that uses Kafka Streams 
> (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
> Our goal is to join offer information from our sellers with additional data 
> from various input topics, and then feed the resulting joined information 
> into an output topic.
> Our application is deployed in Kubernetes using the StatefulSet feature, with 
> one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.
> We are using avro to serialize / deserialize input topics and storing in the 
> state stores of Kafka streams.
> We have encountered a bug in Kafka Streams that prevents us from deploying 
> new versions of Kafka Streams containing new compatible Avro schemas of our 
> input topics.
> The symptom is that after deploying our new version, which contains no 
> changes in topology but only changes to the Avro schema used, we discard 
> every event coming from the right part of the join concerned by these Avro 
> schema changes until we receive something from the left part of the join.
> As a result, we are losing events and corrupting our output topics and stores 
> with outdated information.
> After checking the local help for the priority to assign, I have assigned it 
> as *CRITICAL* because we are losing data (for example, tombstones are not 
> propagated to the following joins, so some products are still visible on our 
> website when they should not be).
> Please feel free to change the priority if you think it is not appropriate.
>  
> *The bug:*
> After multiple hours of investigation we found out that the bug is located in 
> the foreign key join feature and specifically in this class: 
> *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
> join. 
> This class and his method process(...) is computing a hash from the local 
> store via a serialization of a deserialized value from the left state store 
> and comparing it with the hash of the original message from the 
> subscription-response-topic. 
>  
> It means that when we deploy a new version of our kafka streams instance with 
> a new compatible avro schema from the left side of a join, every join 
> triggered by the right part of the join are invalidated until we receive all 
> the events again on the left side. Every join triggered by the right part of 
> the join are discarded because all the hashes computed by kafka streams are 
> different now from the original messages.
>  
> *How to reproduce it:*
> If we take a working and a non-working workflow, it will do something like 
> this:
> +Normal non-breaking+ workflow from the left part of the FK join:
>  # A new offer event occurs. The offer is received and stored (v1).
>  # A subscription registration is sent with the offer-hash (v1).
>  # The subscription is saved to the store with the v1 offer-hash.
>  # Product data is searched for.
>  # If product data is found, a subscription response is sent back, including 
> the v1 offer-hash.
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
> New product event from the right part of the FK join:
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
>  
> +Breaking workflow:+ 
> The offer serializer is changed to offer v2
> New product event from the right part of the FK join: 
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent 

[jira] [Commented] (KAFKA-15116) Kafka Streams processing blocked during rebalance

2023-08-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750506#comment-17750506
 ] 

Matthias J. Sax commented on KAFKA-15116:
-

Are you saying you are using a custom store? For this case, it's your 
responsibility to make sure it works with Kafka Streams. If you violate 
assumptions Kafka Streams makes (one of them is, that a task can access a store 
independently of all other tasks), all bets are off unfortunately. Thus, you 
would need to change your store that one task cannot block any other task to 
make progress.
{quote}It is the behaviour we have observed and validated with a fix to the 
core streams code. 
{quote}
Are you referring to 
{quote}The fix is to add a check for rebalancing in the while loop in runOnce. 
This checks if a rebalancing is in progress and sets the numIterations to 0 to 
stop processing of messages. When it has rebalanced it sets numIterations back 
to 1.
{quote}
from further above? It's not clear to me why this would help? In the end, when 
a rebalance starts, we might continue processing until we need to hand off a 
partition. For this case, we need to commit pending transactions first, and 
would start a new transaction for the remaining partitions we have afterwards.
{quote}Committing the open transactions is fine (if possible). The problem is 
the un committed transactions due to the rebalancing.
{quote}
Not sure if I can follow. An open and uncommitted transaction is the same 
thing... When a rebalance starts, Kafka Streams would commit all pending 
transactions first, and thus there should not be any pending transactions. Or 
course, as said above, a new TX might get started right away for all partitions 
we did not need to hand off and processing would continue right away.
{quote}If we have two un committed transactions for the same partition key we 
end up in the blocking state because the second message cannot be processed 
because the first message hasn't been committed. 
{quote}
What do you mean by "two un-committed transaction for the same partition key" – 
if there are two messages with the same key, they should be in the same input 
topic partition (the only exception would be some stateless processing but in 
your case state is involved) what ensures that a single task (and thus a single 
thread) processes all record with the a key, and thus the is only one 
transaction for this key. If you use a custom state store and violate this 
assumption, and put two record into different partitions and they are 
potentially processed by two threads, and thus you create a deadlock on the 
state store when both thread try to access the same row for this key, it's an 
incorrect usage of Kafka Streams. 
{quote}The old behaviour sounds like it would solve our problem. Is there a 
configuration option to switch this back on?
{quote}
[~ableegoldman] might know if it's possible to switch of cooperative 
rebalancing, but again, it seem the issue is how you use Kafka Streams (maybe I 
am wrong) – you should never block in a Processor (and for your case maybe even 
end up in a deadlock until some timeout hits, if I understood what you are 
saying correctly). – Also, even if it's possible to disable cooperative 
rebalancing, the old behavior is effectively deprecated and eager rebalancing 
will be completely removed in a future release.
{quote}To answer your question "How should the system know if there is a 
dependency?": Through configuration. I don't think anything that we are trying 
to do is going against how Kafka is designed. It might be non optimal and 
legacy but it does feel like something that streams should be flexible enough 
to handle. Why can't we chose to "stop the world"?
{quote}
That is conceptually possible – and even for cooperative rebalancing we could 
`pause()` all partitions and not process anything. But again, from what I think 
to understand so far, the issue is blocking in user-code, not how Kafka Streams 
works.

> Kafka Streams processing blocked during rebalance
> -
>
> Key: KAFKA-15116
> URL: https://issues.apache.org/jira/browse/KAFKA-15116
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: David Gammon
>Priority: Major
>
> We have a Kafka Streams application that simply takes a messages, processes 
> it and then produces an event out the other side. The complexity is that 
> there is a requirement that all events with the same partition key must be 
> committed before the next message  is processed.
> This works most of the time flawlessly but we have started to see problems 
> during deployments where the first message blocks the second message during a 
> rebala

[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order

2023-08-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750504#comment-17750504
 ] 

Matthias J. Sax commented on KAFKA-15297:
-

The ticket description contains an example to reproduce it (and there is also a 
png attachment visualizing the topology). 
{quote}which in turn *should* reflect the topological order of the attached 
processor nodes.
{quote}
That's not always the case unfortunately.

> Cache flush order might not be topological order 
> -
>
> Key: KAFKA-15297
> URL: https://issues.apache.org/jira/browse/KAFKA-15297
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Bruno Cadonna
>Priority: Major
> Attachments: minimal_example.png
>
>
> The flush order of the state store caches in Kafka Streams might not 
> correspond to the topological order of the state stores in the topology. The 
> order depends on how the processors and state stores are added to the 
> topology. 
> In some cases downstream state stores might be flushed before upstream state 
> stores. That means, that during a commit records in upstream caches might end 
> up in downstream caches that have already been flushed during the same 
> commit. If a crash happens at that point, those records in the downstream 
> caches are lost. Those records are lost for two reasons:
> 1. Records in caches are only changelogged after they are flushed from the 
> cache. However, the downstream caches have already been flushed and they will 
> not be flushed again during the same commit.
> 2. The offsets of the input records that caused the records that now are 
> blocked in the downstream caches are committed during the same commit and so 
> they will not be re-processed after the crash.
> An example for a topology where the flush order of the caches is wrong is the 
> following:
> {code:java}
> final String inputTopic1 = "inputTopic1";
> final String inputTopic2 = "inputTopic2";
> final String outputTopic1 = "outputTopic1";
> final String processorName = "processor1";
> final String stateStoreA = "stateStoreA";
> final String stateStoreB = "stateStoreB";
> final String stateStoreC = "stateStoreC";
> streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), 
> Serdes.String()))
> .process(
> () -> new Processor() {
> private ProcessorContext context;
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> }
> @Override
> public void process(Record record) {
> context.forward(record);
> }
> @Override
> public void close() {}
> },
> Named.as("processor1")
> )
> .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
> streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), 
> Serdes.String()))
> .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .toStream()
> .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
> final Topology topology = streamsBuilder.build(streamsConfiguration);
> topology.connectProcessorAndStateStores(processorName, stateStoreC);
> {code}
> This code results in the attached topology.
> In the topology {{processor1}} is connected to {{stateStoreC}}. If 
> {{processor1}} is added to the topology before the other processors, i.e., if 
> the right branch of the topology is added before the left branch as in the 
> code above, the cache of {{stateStoreC}} is flushed before the caches of 
> {{stateStoreA}} and {{stateStoreB}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12829) Remove Deprecated methods under Topology

2023-08-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750503#comment-17750503
 ] 

Matthias J. Sax commented on KAFKA-12829:
-

It's actually unclear when 4.0 will come along, so it seem pre-mature to work 
on a PR now, as it would outdate over time. – I am sure there will be at least 
3.7 after the next release which is 3.6, and to me it seems not unlikely that 
the might even be a 3.8 one.

> Remove Deprecated methods under Topology
> 
>
> Key: KAFKA-12829
> URL: https://issues.apache.org/jira/browse/KAFKA-12829
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Josep Prat
>Priority: Blocker
> Fix For: 4.0.0
>
>
> The following methods were deprecated in version 2.7:
>  * org.apache.kafka.streams.Topology#addProcessor(java.lang.String, 
> org.apache.kafka.streams.processor.ProcessorSupplier, java.lang.String...) 
>  * 
> org.apache.kafka.streams.Topology#addGlobalStore(org.apache.kafka.streams.state.StoreBuilder,
>  java.lang.String, org.apache.kafka.common.serialization.Deserializer, 
> org.apache.kafka.common.serialization.Deserializer, java.lang.String, 
> java.lang.String, org.apache.kafka.streams.processor.ProcessorSupplier)
>  * 
> org.apache.kafka.streams.Topology#addGlobalStore(org.apache.kafka.streams.state.StoreBuilder,
>  java.lang.String, org.apache.kafka.streams.processor.TimestampExtractor, 
> org.apache.kafka.common.serialization.Deserializer, 
> org.apache.kafka.common.serialization.Deserializer, java.lang.String, 
> java.lang.String, org.apache.kafka.streams.processor.ProcessorSupplier) 
>  
> See KAFKA-10605 and KIP-478.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15299) Support left stream-table join on foreign key

2023-08-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15299:

Labels: kip  (was: )

> Support left stream-table join on foreign key
> -
>
> Key: KAFKA-15299
> URL: https://issues.apache.org/jira/browse/KAFKA-15299
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Igor Fomenko
>Assignee: Igor Fomenko
>Priority: Major
>  Labels: kip
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> KIP-955: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-955%3A+Add+stream-table+join+on+foreign+key
> Currently in Kafka Streams DSL, KStream to KTable joins could only be 
> performed with the keys. However in practice it is often required to join the 
> messages in Kafka topics using message field as a "foreign key" with the 
> following pattern:  
>  
> streamX.leftJoin(tableY, RecordTableY::getForegnKey, 
> joiner).to("output-topic-name")
>  
> The left loin on foreign key operation will result in a stream of messages 
> from two topics joined on foreign key where each output message is produced 
> for each event on the input stream.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [QUESTION] What is the difference between sequence and offset for a Record?

2023-08-01 Thread Matthias J. Sax

The _offset_ is the position of the record in the partition.

The _sequence number_ is a unique ID that allows broker to de-duplicate 
messages. It requires the producer to implement the idempotency protocol 
(part of Kafka transactions); thus, sequence numbers are optional and as 
long as you don't want to support idempotent writes, you don't need to 
worry about them. (If you want to dig into details, checkout KIP-98 that 
is the original KIP about Kafka TX).


HTH,
  -Matthias

On 8/1/23 2:19 AM, tison wrote:

Hi,

I'm wringing a Kafka API Rust codec library[1] to understand how Kafka
models its concepts and how the core business logic works.

During implementing the codec for Records[2], I saw a twins of fields
"sequence" and "offset". Both of them are calculated by
baseOffset/baseSequence + offset delta. Then I'm a bit confused how to deal
with them properly - what's the difference between these two concepts
logically?

Also, to understand how the core business logic works, I write a simple
server based on my codec library, and observe that the server may need to
update offset for records produced. How does Kafka set the correct offset
for each produced records? And how does Kafka maintain the calculation for
offset and sequence during these modifications?

I'll appreciate if anyone can answer the question or give some insights :D

Best,
tison.

[1] https://github.com/tisonkun/kafka-api
[2] https://kafka.apache.org/documentation/#messageformat



Re: [VOTE] KIP-759: Unneeded repartition canceling

2023-07-31 Thread Matthias J. Sax

+1 (binding)

On 7/11/23 11:16 AM, Shay Lin wrote:

Hi all,

I'd like to call a vote on KIP-759: Unneeded repartition canceling
The KIP has been under discussion for quite some time(two years). This is a
valuable optimization for advanced users. I hope we can push this toward
the finish line this time.

Link to the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling

Best,
Shay



[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-07-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748367#comment-17748367
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

Thanks for getting back – I was just asking a general question :) 

The DQL feature seems to be independent though, and I agree that it could be 
useful to add to Kafka Streams.

About the handler: looking into the stacktrace, it seem that the issue is 
actually happening during commit, in particular when offset are written:
sendOffsetsToTransaction(KafkaProducer.java:757)
This is a totally different code path. The `ProductionExceptionHandler` is 
covering the `producer.send()` code path only. – Looking into the code of both 
3.1 and 3.2, the behavior should be the same: for the call to 
`sendOffsetsToTransactions` the handler won't be triggered.

And for this case, we also cannot trigger the handler, because there is nothing 
to be dropped on the floor – Kafka Streams tries to write offsets to commit a 
TX and we cannot skip writing offsets.
{quote}Our additional testing revealed that "continue" worked in Kafka Streams 
3.1.2, but no longer works since Kafka Streams 3.2.0, as rollback occurs.
{quote}
Did you test this for `send()` or the commit case? For the `send()` case it 
should work for both versions; for the commit-case it should not work for 
either version (and is something that cannot be fixed).

Curious to hear about your findings.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooL

Re: Consuming an entire partition with control messages

2023-07-27 Thread Matthias J. Sax
Well, `kafka-consumer-group.sh` can only display the difference between 
"committed offset" and "end offset". It cannot know what the "right" 
offset to be committed is. It's really the responsibility of the 
consumers to commit correctly.


-Matthias

On 7/27/23 1:03 AM, Vincent Maurin wrote:
Thank you Matthias for your answer, I open an issue on the aiokafka 
project as follow up, let's see how we can resolve it there 
https://github.com/aio-libs/aiokafka/issues/911


As mentioned in the issue, some tools like kafka-consumer-groups.sh also 
display a lag of "1" in this kind of situation


Best regards,

Vincent

On 13/06/2023 17:27, Matthias J. Sax wrote:

Sounds like a bug in aiokafka library to me.

If the last message in a topic partition is a tx-marker, the consumer 
should step over it, and report the correct position after the marker.


The official KafkaConsumer (ie, the Java one), does the exact same thing.


-Matthias

On 5/30/23 8:41 AM, Vincent Maurin wrote:

Hello !

I am working on an exactly once stream processors in Python, using
aiokafka client library. My program stores a state in memory, that is
recovered from a changelog topic, like in kafka streams.

On each processing loop, I am consuming messages, producing messages
to an output topics and to my changelog topic, within a transaction.

When I need to restart a runner, to restore the state in memory, I
have a routine consuming the changelog topic from the beginning to the
"end" with a read_commited isolation level. Here I am struggling to
define when to stop my recovery :
* my current (maybe) working solution is to loop over "poll" until
poll is not returning any messages anymore
* I tried to do more something based on the end offests, the checking
the consumer position, but with control messages at the end of the
partition, I am running into an issue where position is one below end
offsets, and doesn't go further

I had a quick look to
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
but it is a bit hard to figure out what is going on here

Best regards,
Vincent


Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-26 Thread Matthias J. Sax

One last question. What should happen for the following case:

KStream myStream = build.stream(...).map(...);
myStream.markAsPartiitoned().groupByKey().aggregate(...);
myStream.join(...)

The question is about the "fan-out" pattern. `myStream`, which is marked 
for partitioning, is fed into two downstream operations. Thus, it's 
clear that the aggregation won't trigger a rebalance. However, the 
fan-out happens before `markAsRepartiitoned` and thus I would assume 
that the join would trigger a repartitioning?


This question is important, because if we follow what I said above, 
`markAsRepartiitoned` returns a new KStream object, but does mutate the 
upstream KStream object, what is semantically two different things. It 
also has an impact on how we need to implement the feature. The KIP 
should explicitly explain this case.



-Matthias

On 7/26/23 4:58 PM, Shay Lin wrote:

Hi John,

Thanks for your reply. I updated the KIP to reflect the changes we
discussed in the thread today.
#1 is duly noted, I learned from the examples Sophie sent earlier! =)

In the new version, I also talked about why IQ and joins will not work with
the interface and talked about the mitigation. The proposal
now specifically states we are solving the unneeded partition problem when
IQ or join does not coexist in the kafka streams. In the concerns section,
the proposal talks about having a reverse mapping would make this new
interface compatible with IQ and join again but is subject to demand.

Let me know what you think. Thanks!
Shay



On Wed, Jul 26, 2023 at 2:35 PM John Roesler  wrote:


Hello Shay,

Thanks for the KIP!

I just took a look in preparation to vote, and there are two small-ish
things that I'd like to fix first. Apologies if this stuff has already come
up in the discussion thread; I only skimmed it.

1. The KIP only mentions the name of the method instead of providing a
code snippet showing exactly what the method signature will be in the
interface. Normally, KIPs do the latter because it removes all ambiguity
from the proposal. It also gives you an opportunity to write down the
Javadoc you would add to the method instead of just mentioning the points
that you plan to document.

2. The KIP lists some concerns, but not what you will do to mitigate them.
For example, the concern about IQ not behaving correctly. Will you disable
the use of the implicit partitioner downstream of one of these
cancellations? Or provide a new interface to supply the "reverse mapping"
you mentioned? Or include documentation in the Javadoc for how to deal with
the situation? I think there are a range of options for each of those
concerns, and we should state up front what we plan to do.

Thanks again!
-John

On 2023/07/24 20:33:05 Sophie Blee-Goldman wrote:

Thanks Shay! You and Matthias have convinced me, I'm happy with the

current

proposal. I think once you make the minor
updates to the KIP document this will be ready for voting again.

Cheers,
Sophie

On Mon, Jul 24, 2023 at 8:26 AM Shay Lin  wrote:


Hi Sophie and Matthias, thanks for your comments and replies.

1. Scope of change: KStreams only or KStreams/KTable
I took some time to digest your points, looking through how KStreams
triggers repartitions today. I noticed that `repartitionRequired`is a

flag

in KStreamImpl etc and not in KTableImpl etc. When I look further, in

the

case of KTable, instead of passing in a boolean flag, a repartition

node `

TableRepartitionMapNode` is directly created. I went back and

referenced

the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were
focused on KStreams, i.e. not to change the partition why the input

streams

are already correctly keyed. Is it possible that in the case of KTable,
users always intend to repartition (change key) when they call on
aggregate? -- (this was written before I saw Matthias's comment)

Overall, based on the tickets, I see the benefit of doing a contained
change focusing on KStreams, i.e. repartitionRequired, which would

solve

the pain points nicely. If we ran into similar complaints/optimization
requests for KTable down the line, we can address them on top of

this(let

me know if we have these requests already, I might just be negligent).

2. API: markAsPartitioned() vs config
If we go with the KStreams only scope, markAsPartition() is more
adequate, i.e. maps nicely to repartitionRequired. There is a list of
NamedOperations that may or may not trigger repartition based on its
context(KStreams or KTable) which would make the implementation more
confusing.

3. KIP documentation: Thanks for providing the links to previous KIPs.

I

will be adding the three use cases and javadoc. I will also document

the

risks when it relates to IQ and Join.

Best,
Shay

On Fri, Jul 21, 2023 at 5:55 PM Matthias J. Sax 

wrote:



I agree that it could easily be misused. There is a few Jira tickets

for

cases when people want to "cancel" a repartition step. I would hope
those

Re: [DISCUSS] KIP-960: Support interactive queries (IQv2) for versioned state stores

2023-07-26 Thread Matthias J. Sax
Thanks for the KIP Alieh. Glad to see that we can add IQ to the new 
versioned stores!




Couple of questions:


single-key lookup with timestamp (upper) bound


Not sure if "bound" is the right term? In the end, it's a point lookup 
for a key plus timestamps, so it's an as-of timestamp (not a bound)? Of 
course, the returned record would most likely have a different (smaller) 
timestamp, but that's expected but does not make the passed in timestamp 
a "bound" IMHO?



single-key query with timestamp range
single-key all versions query


Should we also add `withLowerTimeBound` and `withUpperTimeBound` 
(similar to what `RangeQuery` has)?


Btw: I think we should not pass `long` for timestamps, but `Instance` types.

For time-range queries, do we iterate over the values in timestamp 
ascending order? If yes, the interface should specify it? Also, would it 
make sense to add reverse order (also ok to exclude and only do if there 
is demand in a follow up KIP; if not, please add to "Rejected 
alternatives" section).


Also, for time-range query, what are the exact bound for stuff we 
include? In the end, a value was a "valid range" (conceptually), so do 
we include a record if it's valid range overlaps the search time-range, 
or must it be fully included? Or would we only say, that the `validFrom` 
timestamp that is stored must be in the search range (what implies that 
the lower end would be a non-overlapping but "fully included" bound, 
while the upper end would be a overlapping bound).


For key-range / time-range queries: do we return the result in `` 
order or `` order? Also, what about reverse iterators?


About ` ValueIterator` -- think the JavaDocs have c error in it for 
`peekNextRecord` (also, should it be called `peekNextValue`? (Also some 
other JavaDocs seem to be incomplete and not describe all parameters?)



Thanks.



-Matthias



On 7/26/23 7:24 AM, Alieh Saeedi wrote:

Hi all,

I would like to propose a KIP to support IQv2 for versioned state stores.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+interactive+queries+%28IQv2%29+for+versioned+state+stores

Looking forward to your feedback!

Cheers,
Alieh



Re: Streams: Global Store Processors

2023-07-26 Thread Matthias J. Sax

That's correct.

And yes, it's a public contract and thus a KIP would be needed to change 
(or remove it). Deprecation implies that the API is kept for at least 3 
releases (ie, one year), plus it can only be removed in a major release.


For example, if we deprecated something in 2.1 release, we could have 
removed it in 3.0. If we deprecated something in 2.7 release, (there was 
only 2.8 and than 3.0), we can only remove in 4.0 (only 2.6 or earlier 
deprecated things could be removed in 3.0 to meet the 3 releases / one 
year requirement).


But I am not sure if we would really remove the processor, or actually 
change the restore path instead. Last but not least, if there would be 
KIP about removing it, the main goal would be to hide the store from 
user code, so we could still allow to register some "call-back 
processor" that has no access to the state store itself.



-Matthias

On 7/26/23 10:39 AM, Colt McNealy wrote:

Hi all,

In this JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-7663 it's
documented that Global Stores are bypassed on restoration.

Consequently, the input topic to a Global Store needs to essentially be a
changelog topic as the keys and values are copied directly into the store.

I heard (perhaps in the Slack) a while ago that there was some conversation
about removing the ability to supply a Processor to the Global Store to
prevent users from tripping over that behavior. However, we currently rely
on the Processor to notify other parts of our application that things have
changed in the store (eg. for cache invalidation, metrics, etc). Obviously,
we make sure to respect the semantics of how the processor+global store
works for restoration etc...

It seems to me like the fact that we can pass in a Processor is a public
API contract, so it should be safe to rely on that...? Would it require a
KIP to change the fact that we can pass in a Processor? How much
deprecation notice would we have before we need to find a new solution?

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*



[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-07-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17747630#comment-17747630
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

Thanks for opening this ticket. Given that you are using "exaclty-once", does 
it actually make sense to configure the handler with "continue" – using 
"continue" implies data-loss and thus contradict the usage of "exaclty-once".

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of this error but 
> written offsets would not be recorded.
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 INFO  TransactionManager:393 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactio

[jira] [Updated] (KAFKA-15257) Support interactive queries (IQv2) with versioned state store

2023-07-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15257:

Labels: kip  (was: )

> Support interactive queries (IQv2) with versioned state store
> -
>
> Key: KAFKA-15257
> URL: https://issues.apache.org/jira/browse/KAFKA-15257
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>  Labels: kip
>
> Query types to consider include:
>  * single-key latest-value lookup
>  * single-key lookup with timestamp bound
>  * single-key query with timestamp range
>  * single-key all versions query
>  * key-range latest-value query
>  * key-range query with timestamp bound
>  * key-range query with timestamp range
>  * key-range all versions query
>  * all-keys latest-value query
>  * all-keys all versions (i.e., entire store) query
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15257) Support interactive queries (IQv2) with versioned state store

2023-07-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15257:

Description: 
KIP-960: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+interactive+queries+%28IQv2%29+for+versioned+state+stores]
 

Query types to consider include:
 * single-key latest-value lookup
 * single-key lookup with timestamp bound
 * single-key query with timestamp range
 * single-key all versions query

 * key-range latest-value query
 * key-range query with timestamp bound
 * key-range query with timestamp range
 * key-range all versions query

 * all-keys latest-value query
 * all-keys all versions (i.e., entire store) query

 

 

 

 

  was:
Query types to consider include:
 * single-key latest-value lookup
 * single-key lookup with timestamp bound
 * single-key query with timestamp range
 * single-key all versions query

 * key-range latest-value query
 * key-range query with timestamp bound
 * key-range query with timestamp range
 * key-range all versions query

 * all-keys latest-value query
 * all-keys all versions (i.e., entire store) query

 

 

 

 


> Support interactive queries (IQv2) with versioned state store
> -
>
> Key: KAFKA-15257
> URL: https://issues.apache.org/jira/browse/KAFKA-15257
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>  Labels: kip
>
> KIP-960: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+interactive+queries+%28IQv2%29+for+versioned+state+stores]
>  
> Query types to consider include:
>  * single-key latest-value lookup
>  * single-key lookup with timestamp bound
>  * single-key query with timestamp range
>  * single-key all versions query
>  * key-range latest-value query
>  * key-range query with timestamp bound
>  * key-range query with timestamp range
>  * key-range all versions query
>  * all-keys latest-value query
>  * all-keys all versions (i.e., entire store) query
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15257) Support interactive queries (IQv2) with versioned state store

2023-07-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15257:

Component/s: streams

> Support interactive queries (IQv2) with versioned state store
> -
>
> Key: KAFKA-15257
> URL: https://issues.apache.org/jira/browse/KAFKA-15257
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>
> Query types to consider include:
>  * single-key latest-value lookup
>  * single-key lookup with timestamp bound
>  * single-key query with timestamp range
>  * single-key all versions query
>  * key-range latest-value query
>  * key-range query with timestamp bound
>  * key-range query with timestamp range
>  * key-range all versions query
>  * all-keys latest-value query
>  * all-keys all versions (i.e., entire store) query
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15257) Support interactive queries (IQv2) with versioned state store

2023-07-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15257:

Issue Type: New Feature  (was: Task)

> Support interactive queries (IQv2) with versioned state store
> -
>
> Key: KAFKA-15257
> URL: https://issues.apache.org/jira/browse/KAFKA-15257
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>
> Query types to consider include:
>  * single-key latest-value lookup
>  * single-key lookup with timestamp bound
>  * single-key query with timestamp range
>  * single-key all versions query
>  * key-range latest-value query
>  * key-range query with timestamp bound
>  * key-range query with timestamp range
>  * key-range all versions query
>  * all-keys latest-value query
>  * all-keys all versions (i.e., entire store) query
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2023-07-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17747210#comment-17747210
 ] 

Matthias J. Sax commented on KAFKA-12317:
-

Thanks for picking it up – I think it would be good to tackle all 4 related 
tickets at once, to make sure we don't get a zoo of different behavior for 
different operators.

[~ableegoldman] [~vvcephei] [~guozhang] [~cadonna] – do you think we need a KIP 
for this change?

> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`-key (`null`-join-key for 
> stream-globalTable), because for a `null`-(join)key the join is undefined: 
> ie, we don't have an attribute the do the table lookup (we consider the 
> stream-record as malformed). Note, that we define the semantics of 
> _left/outer_ join as: keep the stream record if no matching join record was 
> found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyway). We need to relax this behavior for a left stream-table and 
> left/outer stream-stream join. User need to be aware (ie, we might need to 
> put this into the docs and JavaDocs), that records with `null`-key would be 
> partitioned randomly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15251) Upgrade system test to use 3.5.1

2023-07-25 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15251:
---

 Summary: Upgrade system test to use 3.5.1
 Key: KAFKA-15251
 URL: https://issues.apache.org/jira/browse/KAFKA-15251
 Project: Kafka
  Issue Type: Test
  Components: streams, system tests
Reporter: Matthias J. Sax


3.5.1 was released and we should update the upgrade system tests accordingly to 
use the new version



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15251) Upgrade system test to use 3.5.1

2023-07-25 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15251:
---

 Summary: Upgrade system test to use 3.5.1
 Key: KAFKA-15251
 URL: https://issues.apache.org/jira/browse/KAFKA-15251
 Project: Kafka
  Issue Type: Test
  Components: streams, system tests
Reporter: Matthias J. Sax


3.5.1 was released and we should update the upgrade system tests accordingly to 
use the new version



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-07-25 Thread Matthias J. Sax

Igor,

thanks for the KIP. Interesting proposal. I am wondering a little bit 
about the use-case and semantics, and if it's really required to add 
what you propose? Please correct me if I am wrong.


In the end, a stream-table join is a "stream enrichment" (via a table 
lookup). Thus, it's inherently a 1:1 join (in contrast to a FK 
table-table join which is a n:1 join).


If this assumption is correct, and you have data for which the table 
side join attribute is in the value, you could actually repartition the 
table data using the join attribute as the PK of the table.


If my assumption is incorrect, and you say you want to have a 1:n join 
(note that I intentionally reversed from n:1 to 1:n), I would rather 
object, because it seems to violate the idea to "enrich" a stream, what 
means that each input record produced an output record, not multiple?


Also note: for a FK table-table join, we use the forgeinKeyExtractor to 
get the join attribute from the left input table (which corresponds to 
the KStream in your KIP; ie, it's a n:1 join), while you propose to use 
the foreignKeyExtractor to be applied to the KTable (which is the right 
input, and thus it would be a 1:n join).


Maybe you can clarify the use case a little bit. For the current KIP 
description I only see the 1:1 join case, what would mean we might not 
need such a feature?



-Matthias


On 7/24/23 11:36 AM, Igor Fomenko wrote:

Hello developers of the Kafka Streams,

I would like to start discussion on KIP-955: Add stream-table join on
foreign key

This KIP proposes the new API to join KStrem with KTable based on foreign
key relation.
Ths KIP was inspired by one of my former projects to integrate RDBMS
databases with data consumers using Change Data Capture and Kafka.
If we had the capability in Kafka Stream to join KStream with KTable on
foreign key this would simplify our implementation significantly.

Looking forward to your feedback and discussion.

Regards,

Igor



[jira] [Updated] (KAFKA-15242) FixedKeyProcessor testing is unusable

2023-07-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15242:

Component/s: streams

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-23 Thread Matthias J. Sax
the discussion
around CUSTOM is in the rejected alternatives but I'm happy to differ to
whatever the project conventions are :)


If it's matches existing `ROCKS_DB` or `IN_MEMORY` we just process it as

we
do know, and if know we assume it's a fully qualified class name and try to
instantiate it?

Note that there is no functionality for this kind of thing in
AbstractConfig (it's either a String validated enum or a class) so this
would be a departure from convention. Again, I'm happy to implement that if
it's preferred.


Also wondering how it would related to the existing `Stores` factory?


StoreTypeSpec will depend on Stores factory - they're one layer removed.
You can imagine that StoreTypeSpec is just a grouping of methods from the
Stores factory into a convenient package for default configuration
purposes.

Thanks again for all the detailed thoughts Matthias!

On Fri, Jul 21, 2023 at 11:50 AM Matthias J. Sax  wrote:


Thanks for the KIP. Overall I like the idea to close this gap.

However, I am wondering if we should close others gaps first? In
particular, IIRC, we have a few cases for which we only have a RocksDB
implementation for a store, and thus, adding an in-memory version for
these stores first, to make the current `IN_MEMORY` parameter work,
might be the first step?

In particular, this holds for the new versioned-store (but I actually
believe the is some other internal store with no in-memory
implementation). -- For `suppress()` it's actually other way around we
we only have an in-memory implementation. Do you aim to allow custom
stores for `suppress()`, too?

Btw: Should versioned stores also be covered by the KIP (ie,
`StoreTypeSpec`)? We did consider to add a new option `VERSIONED` to the
existing `default.dsl.store` config, but opted out for various reasons.

Last, I am not sure if the new parameter replacing the existing one is
the best way to go? Did you put the idea to add `CUSTOM` to the existing
config into rejected alternative. Personally, I would prefer to add
`CUSTOM` as I would like to optimize to easy of use for the majority of
users (which don't implement a custom store), but only switch to
in-memory sometimes. -- As an alternative, you would also just extend
`dsl.default.store` (it's just a String) and allow to pass in anything.
If it's matches existing `ROCKS_DB` or `IN_MEMORY` we just process it as
we do know, and if know we assume it's a fully qualified class name and
try to instantiate it? -- Given that we plan to keep the store-enum, is
seems cleaner to keep the existing config and keep both the config and
enum aligned to each other?


It's just preliminary thought. I will need to go back an take a more
detailed look into the code to grok how the propose `StoreTypeSpec`
falls into place. Also wondering how it would related to the existing
`Stores` factory?

-Matthias


On 7/21/23 6:45 AM, Colt McNealy wrote:

Sophie—

Thanks for chiming in here. +1 to the idea of specifying the ordering
guarantees that we make in the StorageTypeSpec javadocs.

Quick question then. Is the javadoc that says:


Order is not guaranteed as bytes lexicographical ordering might not

represent key order.

no longer correct, and should say:


Order guarantees depend on the underlying implementation of the

ReadOnlyKeyValueStore. For more information, please consult the
[StorageTypeSpec javadocs]()

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Jul 20, 2023 at 9:28 PM Sophie Blee-Goldman <

ableegold...@gmail.com>

wrote:


Hey Almog, first off, thanks for the KIP! I (and others) raised

concerns

over how restrictive the default.dsl.store config would be if not
extendable to custom store types, especially given that this seems to

be

the primary userbase of such a feature. At the time we didn't really

have

any better ideas for a clean way to achieve that, but what you

proposed

makes a lot of sense to me. Happy to see a good solution to this, and
hopefully others will share my satisfaction :P

I did have one quick piece of feedback which arose from an unrelated
question posed to the dev mailing list w/ subject line
"ReadOnlyKeyValueStore#range()
Semantics"
<https://lists.apache.org/thread/jbckmth8d3mcgg0rd670cpvsgwzqlwrm>. I
recommend checking out the full thread for context, but it made me

think

about how we can leverage the new StoreTypeSpec concept as an answer

to

the

long-standing question in Streams: where can we put guarantees of the
public contract for RocksDB (or other store implementations) when all

the

RocksDB stuff is technically internal.

Basically, I'm suggesting two things: first, call out in some way

(perhaps

the StoreTypeSpec javadocs) that each StoreTypeSpec is considered a

public

contract in itself and should outline any semantic guarantees it does,

or

does not, make. Second, we should add a note on ordering guarantees in

the

two OOTB specs: for RocksDB we assert that range queries will honor
serialized byte ordering,

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-21 Thread Matthias J. Sax
I agree that it could easily be misused. There is a few Jira tickets for 
cases when people want to "cancel" a repartition step. I would hope 
those tickets are linked to the KIP (if not, we should do this, and 
maybe even c those cases as motivation into the KIP itself)?


It's always a tricky question to what extend we want to guide users, and 
to what extend we need to give levers for advances case (and how to 
design those levers...) It's for sure a good idea to call out "use with 
case" in the JavaDocs for the new method.



-Matthias

On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote:

I guess I felt a bit uneasy about how this could be used/abused while
reading the KIP, but if we truly believe this is an advanced feature, I'm
fine with the way things currently are. It doesn't feel like the best API,
but it does seem to be the best *possible* API given the way things are.

W.r.t the KTable notes, that all makes sense to me. I just wanted to lay
out all the potential cases to make sure we had our bases covered.

I still think an example or two would help, but the only thing I will
actually wait on before feeling comfortable enough to vote on this would be
a clear method signature (and maybe sample javadocs) in the "Public
Interfaces" section.

Thanks again for the KIP Shay! Hope I haven't dragged it out too much

On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax  wrote:


Some thought about the API question.



A. kstream.groupBy(...).aggregate(...)


This can be re-writtten as

kstream.selectKey(...)
 .markAsRepartitioned()
 .groupByKey()
 .aggregate()

Given that `markAsRepartitoned` is an advanced feature, I think it would
be ok?



B. ktable.groupBy(...).aggregate(...)


For KTable aggregation, not sure how useful it would be? In the end, an
table aggregation does only make sense if we pick something from the
value, ie, we indeed change the key?



C. kstream.selectKey(...).join(ktable)


We can just insert a `markAsRepartitioned()` after `selectKey` to avoid
repartitioning of the left input KStream.



KStream.selectKey(...).toTable().join(...)


Not sure if I understand what you try to say with this example? In the
end, `selectKey(...).toTable()` would repartiton. If I know that one can
upsert directly, one inserts a `markAsRepartitioned()` in between.


In general, the use case seems to be that the key is not in the right
"format", or there is no key, but data was partitioned by a
value-attribute upstream and we just want to extract this
value-attribute into the key. Both seems to be KStream cases?


-Matthias



On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote:

Hey Shay, while I don't have any specific concerns about the new public

API

in this KIP, I'd like to better understand how this feature will work
before I vote. We should document the behavior of this new operator

clearly

in the KIP as well -- you don't necessarily need to write the complete
javadocs up front, but it should be possible for a user to read the KIP

and

then understand how this feature will work and how they would need to

apply

it.

To that end, I recommend framing this proposal with a few examples to

help

clarify the semantics. When and where can you apply the

markAsPartitioned()

operator? Some suggestions below.

Specific notes:

1. The KIP opens with "Each key changing operation in Kafka Streams
(selectKey, map, transform, etc.) now leads to automatic repartition

before

an aggregation." We should change "aggregation" to "stateful operation"

as

this is true for things like joins as well as aggregations
2. The callout on IQ makes me a bit uncomfortable -- basically it says

this

should not be a concern "if we use markAsPartitioned correctly". Does

this

mean if we, the devs implementing this, write the feature correctly? Or

is

it saying that this won't be a problem as long as "we", the users of this
feature, use it correctly"? Just wondering if you've put any thought into
how this would work yet (I personally have not)
3. The KIP should lay out the proposed API exactly, even if there's only
one new method. Check out this KIP
<

https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL


(or this KIP
<

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128651808

)
for a good reference on what the Public Interfaces section should include
4. Regarding the proposed API itself, I wonder if KStream is really the
most appropriate interface for the new operator. A repartition can be
triggered on just a KTable. Here's where some examples would help.

Perhaps

we could focus on these three cases:

A. kstream.groupBy(...).aggregate(...)
B. ktable.groupBy(...).aggregate(...)
C. kstream.selectKey(...).join(ktable)

I'm sure someone will correct me if I'm missing any additional vital
examples, but at the very least, these are the three to con

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-21 Thread Matthias J. Sax
g
partitioned" is conceptually a property of other operations applied to a
KStream/KTable, rather than an operation itself. So rather than making this
a DSL operator itself, what if we added it to the Grouped and various
Joined configuration classes? It would allow us to more carefully hit only
the relevant parts of the DSL, so there are no questions about whether/when
to throw errors when the operator is incorrectly applied -- there would be
no way to apply it incorrectly. The main drawback I can think of is simply
that this touches on a larger surface area of the API. I personally don't
believe this is a good enough reason to make it a DSL operator as one could
make that argument for nearly any kind of KStream or KTable operator
configuration going forward, and would explode the KStream/KTable API
surface area instead. Perhaps this was discussed during the previous
iteration of this KIP, or I'm missing something here, so I just wanted to
put this out there and see what people think

Either way, thanks for picking up this KIP. It's been a long time coming :)

-Sophie





On Mon, Jul 10, 2023 at 2:05 PM Shay Lin  wrote:


Hi all,

It's been a few days so I went ahead with editing the KIP, the main change
is on the method name

https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
.
I will follow up with a VOTE separately.

Best,
Shay

On Thu, Jun 29, 2023 at 4:52 PM Matthias J. Sax  wrote:


Shay,

thanks for picking up this KIP. It's a pity that the discussion stalled
for such a long time.

As expressed previously, I am happy with the name `markAsPartitioned()`
and also believe it's ok to just document the impact and leave it to the
user to do the right thing.

If we really get a lot of users that ask about it, because they did not
do the right thing, we could still add something (eg, a reverse-mapper
function) in a follow-up KIP. But we don't know if it's necessary; thus,
making a small incremental step sounds like a good approach to me.

Let's see if others agree or not.


-Matthias

On 6/28/23 5:29 PM, Shay Lin wrote:

Hi all,

Great discussion thread. May I take this KIP up? If it’s alright my

plan

is

to update the KIP with the operator `markAsPartitioned()`.

As you have discussed and pointed out, there are implications to

downstream

joins or aggregation operations. Still, the operator is intended for
advanced users so my two cents is it would be a valuable addition
nonetheless. We could add this as a caution/consideration as part of

the

java doc.

Let me know, thanks.
Shay









Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!

2023-07-21 Thread Matthias J. Sax
I am not a clients (or threading) expert, but I tend to agree to Colin's 
concerns.


In particular, it would be nice to see an example how you intent to use 
the API (I am not familiar with Kotlin or it's co-routins), to better 
understand what this changes help to solve to begin with.


Opening up the consumer sounds potentially dangerous and we should 
weight opportunity and risk before making a decision. So far, I see 
risks but do not understand the opportunity you are after.



-Matthias

On 7/14/23 11:43 AM, Kirk True wrote:

Hi Erik,

Thanks for the KIP!

I empathize with your frustration over the radio silence. It gets like that 
sometimes, and I apologize for my lack of feedback.

I’d personally like to see this lively exchange move over to the DISCUSS thread 
you’d created before.

Thanks,
Kirk


On Jul 14, 2023, at 1:33 AM, Erik van Oosten  
wrote:

Hi Colin,

The way I understood Philp's message is that KIP-944 also plays nice with 
KIP-945. But I might be mistaken.

Regardless, KIP-945 does /not/ resolve the underlying problem (the need for 
nested consumer invocations) because it has the explicit goal of not changing 
the user facing API.


... KIP-945 but haven't posted a DISCUSS thread yet


There is a thread called 'KafkaConsumer refactor proposal', but indeed no 
official discussion yet.


I really don't want to be debugging complex interactions between Java 
thread-local variables and green threads.


In that email thread, I proposed an API change in which callbacks are no longer 
needed. The proposal completely removes the need for such complex interactions. 
In addition, it gives clients the ability to process at full speed even while a 
coorperative rebalance is ongoing.

Regards,
 Erik.

Op 14-07-2023 om 00:36 schreef Colin McCabe:

HI Philip & Erik,

Hmm... if we agree that KIP-945 addresses this use case, I think it would be 
better to just focus on that KIP. Fundamentally it's a better and cleaner model 
than a complex scheme involving thread-local variables. I really don't want to 
be debugging complex interactions between Java thread-local variables and green 
threads.

It also generally helps to have some use-cases in mind when writing these 
things. If we get feedback about what would be useful for async runtimes, that 
would probably help improve and focus KIP-945. By the way, I can see you have a 
draft on the wiki for KIP-945 but haven't posted a DISCUSS thread yet, so I 
assume it's not ready for review yet ;)

best,
Colin


On Tue, Jul 11, 2023, at 12:24, Philip Nee wrote:

Hey Erik - Another thing I want to add to my comment is.  We are in-process
of re-writing the KafkaConsumer, and I think your proposal would work in
the new consumer because we are going to separate the user thread and the
background thread.  Here is the 1-pager, and we are in process of
converting this in to KIP-945.

Thanks,
P

On Tue, Jul 11, 2023 at 10:33 AM Philip Nee  wrote:


Hey Erik,

Sorry for holding up this email for a few days since Colin's response
includes some of my concerns.  I'm in favor of this KIP, and I think your
approach seems safe.  Of course, I probably missed something therefore I
think this KIP needs to cover different use cases to demonstrate it doesn't
cause any unsafe access. I think this can be demonstrated via diagrams and
some code in the KIP.

Thanks,
P

On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten
 wrote:


Hello Colin,

  >> In KIP-944, the callback thread can only delegate to another thread
after reading from and writing to a threadlocal variable, providing the
barriers right there.

  > I don't see any documentation that accessing thread local variables
provides a total store or load barrier. Do you have such documentation?
It seems like if this were the case, we could eliminate volatile
variables from most of the code base.

Now I was imprecise. The thread-locals are only somewhat involved. In
the KIP proposal the callback thread reads an access key from a
thread-local variable. It then needs to pass that access key to another
thread, which then can set it on its own thread-local variable. The act
of passing a value from one thread to another implies that a memory
barrier needs to be passed. However, this is all not so relevant since
there is no need to pass the access key back when the other thread is
done.

But now I think about it a bit more, the locking mechanism runs in a
synchronized block. If I remember correctly this should be enough to
pass read and write barriers.

  >> In the current implementation the consumer is also invoked from
random threads. If it works now, it should continue to work.
  > I'm not sure what you're referring to. Can you expand on this?

Any invocation of the consumer (e.g. method poll) is not from a thread
managed by the consumer. This is what I was assuming you meant with the
term 'random thread'.

  > Hmm, not sure what you mean by "cooperate with blocking code." If you
have 10 green threads you're multiplexing on to one 

Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-21 Thread Matthias J. Sax

Thanks for the KIP. Overall I like the idea to close this gap.

However, I am wondering if we should close others gaps first? In 
particular, IIRC, we have a few cases for which we only have a RocksDB 
implementation for a store, and thus, adding an in-memory version for 
these stores first, to make the current `IN_MEMORY` parameter work, 
might be the first step?


In particular, this holds for the new versioned-store (but I actually 
believe the is some other internal store with no in-memory 
implementation). -- For `suppress()` it's actually other way around we 
we only have an in-memory implementation. Do you aim to allow custom 
stores for `suppress()`, too?


Btw: Should versioned stores also be covered by the KIP (ie, 
`StoreTypeSpec`)? We did consider to add a new option `VERSIONED` to the 
existing `default.dsl.store` config, but opted out for various reasons.


Last, I am not sure if the new parameter replacing the existing one is 
the best way to go? Did you put the idea to add `CUSTOM` to the existing 
config into rejected alternative. Personally, I would prefer to add 
`CUSTOM` as I would like to optimize to easy of use for the majority of 
users (which don't implement a custom store), but only switch to 
in-memory sometimes. -- As an alternative, you would also just extend 
`dsl.default.store` (it's just a String) and allow to pass in anything. 
If it's matches existing `ROCKS_DB` or `IN_MEMORY` we just process it as 
we do know, and if know we assume it's a fully qualified class name and 
try to instantiate it? -- Given that we plan to keep the store-enum, is 
seems cleaner to keep the existing config and keep both the config and 
enum aligned to each other?



It's just preliminary thought. I will need to go back an take a more 
detailed look into the code to grok how the propose `StoreTypeSpec` 
falls into place. Also wondering how it would related to the existing 
`Stores` factory?


-Matthias


On 7/21/23 6:45 AM, Colt McNealy wrote:

Sophie—

Thanks for chiming in here. +1 to the idea of specifying the ordering
guarantees that we make in the StorageTypeSpec javadocs.

Quick question then. Is the javadoc that says:


Order is not guaranteed as bytes lexicographical ordering might not

represent key order.

no longer correct, and should say:


Order guarantees depend on the underlying implementation of the

ReadOnlyKeyValueStore. For more information, please consult the
[StorageTypeSpec javadocs]()

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Jul 20, 2023 at 9:28 PM Sophie Blee-Goldman 
wrote:


Hey Almog, first off, thanks for the KIP! I (and others) raised concerns
over how restrictive the default.dsl.store config would be if not
extendable to custom store types, especially given that this seems to be
the primary userbase of such a feature. At the time we didn't really have
any better ideas for a clean way to achieve that, but what you proposed
makes a lot of sense to me. Happy to see a good solution to this, and
hopefully others will share my satisfaction :P

I did have one quick piece of feedback which arose from an unrelated
question posed to the dev mailing list w/ subject line
"ReadOnlyKeyValueStore#range()
Semantics"
. I
recommend checking out the full thread for context, but it made me think
about how we can leverage the new StoreTypeSpec concept as an answer to the
long-standing question in Streams: where can we put guarantees of the
public contract for RocksDB (or other store implementations) when all the
RocksDB stuff is technically internal.

Basically, I'm suggesting two things: first, call out in some way (perhaps
the StoreTypeSpec javadocs) that each StoreTypeSpec is considered a public
contract in itself and should outline any semantic guarantees it does, or
does not, make. Second, we should add a note on ordering guarantees in the
two OOTB specs: for RocksDB we assert that range queries will honor
serialized byte ordering, whereas the InMemory flavor gives no ordering
guarantee whatsoever at this time.

Thoughts?

-Sophie

On Thu, Jul 20, 2023 at 4:28 PM Almog Gavra  wrote:


Hi All,

I would like to propose a KIP to expand support for default store types
(KIP-591) to encompass custom store implementations:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-954%3A+expand+default+DSL+store+configuration+to+custom+types


Looking forward to your feedback!

Cheers,
Almog







[jira] [Comment Edited] (KAFKA-15116) Kafka Streams processing blocked during rebalance

2023-07-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17745690#comment-17745690
 ] 

Matthias J. Sax edited comment on KAFKA-15116 at 7/21/23 6:18 PM:
--

{quote}The internal store is shared across stream threads.
{quote}
That is not how Kafka Streams works. If you have a store, the store is sharded 
and each StreamsThreads has it's own shard. A single key goes into a single 
shard (ie, must go into a single shard – otherwise you break the system) by 
partitioning the data by key.
{quote}There is a consumer outside of kafka streams that is reading 
"read_committed" messages that populates the store and unblocks the processor.
{quote}
Are you saying you are reading the corresponding changelog topic? That is not 
recommended in general, as it's considered an implementation detail. It's still 
not clear why anything would be "blocked" or how this external consumer would 
do the unblocking (blocking to me really mean "to wait / block the thread").
{quote}In this context I'm talking about eos and the transaction being 
committed and therefore the consumer being able to read the "read_committed" 
message.
{quote}
Well yes, if Kafka Streams commit, app pending transactions are committed. So 
if you are saying, you want to accumulate 3 message for a key, but so far only 
2 message got processed, 2 messages would be written into the state store and 
changelog topic on commit. But that is by design and correct. As said above, 
you should not read from the changelog topic. The right thing to do would be, 
to change your processor and let it write into an output topic if all 3 
messages are there (and never write a partial result into this topic), and read 
from this output topic instead of the changelog (in case I did understand the 
scenario you describe correctly).
{quote}I think ultimately our problem is that the stream thread carries on 
processing messages during a rebalance but does not complete them (transaction 
commit)
{quote}
I think you make incorrect assumption how processing works (and what a 
transaction in Kafka is). A transaction is really just to guard against 
failures – it has no _semantic_ meaning in Kafka that would align to your 
business logic (there is no "begin TX" or "commit TX" calls exposed in Kafka 
Streams that you could use to align TX to your business logic – and you don't 
have too).
{quote}Even though pausing processing during a rebalance probably shouldn't be 
default behaviour it would be ideal for us if it were configurable.
{quote}
This was the old "eager rebalancing" and it was changed because there is 
actually no reason to "stop the world" during a rebalance. Also I am not sure 
how it would help your case? Even we stop processing during a rebalance, we 
would need to commit the open TX when rebalancing starts. So nothing really 
changes.
{quote}Pausing consumption feels valid especially when there is a dependency 
between messages with the same partion key?
{quote}
How should the system know if there is a dependency? It seems you are not 
writing your app in the proper way and may incorrect assumptions how Kafka is 
designed?


was (Author: mjsax):
{quote}The internal store is shared across stream threads.
{quote}
That is not how Kafka Streams works. If you have a store, the store is sharded 
and each StreamsThreads has it's own shard.
{quote}There is a consumer outside of kafka streams that is reading 
"read_committed" messages that populates the store and unblocks the processor.
{quote}
Are you saying you are reading the corresponding changelog topic? That is not 
recommended in general, as it's considered an implementation detail. It's still 
not clear why anything would be "blocked" or how this external consumer would 
do the unblocking (blocking to me really mean "to wait / block the thread").
{quote}In this context I'm talking about eos and the transaction being 
committed and therefore the consumer being able to read the "read_committed" 
message.
{quote}
Well yes, if Kafka Streams commit, app pending transactions are committed. So 
if you are saying, you want to accumulate 3 message for a key, but so far only 
2 message got processed, 2 messages would be written into the state store and 
changelog topic on commit. But that is by design and correct. As said above, 
you should not read from the changelog topic. The right thing to do would be, 
to change your processor and let it write into an output topic if all 3 
messages are there (and never write a partial result into this topic), and read 
from this output topic instead of the changelog (in case I did understand the 
scenario you describe correctly).
{quote}I think ultimately our problem is that the stream thread carries on 
processing messages during a rebal

[jira] [Commented] (KAFKA-15116) Kafka Streams processing blocked during rebalance

2023-07-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17745690#comment-17745690
 ] 

Matthias J. Sax commented on KAFKA-15116:
-

{quote}The internal store is shared across stream threads.
{quote}
That is not how Kafka Streams works. If you have a store, the store is sharded 
and each StreamsThreads has it's own shard.
{quote}There is a consumer outside of kafka streams that is reading 
"read_committed" messages that populates the store and unblocks the processor.
{quote}
Are you saying you are reading the corresponding changelog topic? That is not 
recommended in general, as it's considered an implementation detail. It's still 
not clear why anything would be "blocked" or how this external consumer would 
do the unblocking (blocking to me really mean "to wait / block the thread").
{quote}In this context I'm talking about eos and the transaction being 
committed and therefore the consumer being able to read the "read_committed" 
message.
{quote}
Well yes, if Kafka Streams commit, app pending transactions are committed. So 
if you are saying, you want to accumulate 3 message for a key, but so far only 
2 message got processed, 2 messages would be written into the state store and 
changelog topic on commit. But that is by design and correct. As said above, 
you should not read from the changelog topic. The right thing to do would be, 
to change your processor and let it write into an output topic if all 3 
messages are there (and never write a partial result into this topic), and read 
from this output topic instead of the changelog (in case I did understand the 
scenario you describe correctly).
{quote}I think ultimately our problem is that the stream thread carries on 
processing messages during a rebalance but does not complete them (transaction 
commit)
{quote}
I think you make incorrect assumption how processing works (and what a 
transaction in Kafka is). A transaction is really just to guard against 
failures – it has no _semantic_ meaning in Kafka that would align to your 
business logic (there is no "begin TX" or "commit TX" calls exposed in Kafka 
Streams that you could use to align TX to your business logic – and you don't 
have too).
{quote}Even though pausing processing during a rebalance probably shouldn't be 
default behaviour it would be ideal for us if it were configurable.
{quote}
This was the old "eager rebalancing" and it was changed because there is 
actually no reason to "stop the world" during a rebalance. Also I am not sure 
how it would help your case? Even we stop processing during a rebalance, we 
would need to commit the open TX when rebalancing starts. So nothing really 
changes.
{quote}Pausing consumption feels valid especially when there is a dependency 
between messages with the same partion key?
{quote}
How should the system know if there is a dependency? It seems you are not 
writing your app in the proper way and may incorrect assumptions how Kafka is 
designed?

> Kafka Streams processing blocked during rebalance
> -
>
> Key: KAFKA-15116
> URL: https://issues.apache.org/jira/browse/KAFKA-15116
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: David Gammon
>Priority: Major
>
> We have a Kafka Streams application that simply takes a messages, processes 
> it and then produces an event out the other side. The complexity is that 
> there is a requirement that all events with the same partition key must be 
> committed before the next message  is processed.
> This works most of the time flawlessly but we have started to see problems 
> during deployments where the first message blocks the second message during a 
> rebalance because the first message isn’t committed before the second message 
> is processed. This ultimately results in transactions timing out and more 
> rebalancing.
> We’ve tried lots of configuration to get the behaviour we require with no 
> luck. We’ve now put in a temporary fix so that Kafka Streams works with our 
> framework but it feels like this might be a missing feature or potentially a 
> bug.
> +Example+
> Given:
>  * We have two messages (InA and InB).
>  * Both messages have the same partition key.
>  * A rebalance is in progress so streams is no longer able to commit.
> When:
>  # Message InA -> processor -> OutA (not committed)
>  # Message InB -> processor -> blocked because #1 has not been committed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744726#comment-17744726
 ] 

Matthias J. Sax commented on KAFKA-15190:
-

{quote}but although {{StreamsPartitionAssignor}} sometimes calls it a client ID 
and sometimes a process ID it's a {{UUID}} so I assume it really is the process 
ID.
{quote}
Thanks for calling this out. You are right; I missed this point.

As you did mention "max recovery lag", I assume you have a stateful app that 
uses in-memory stores only?

Another thing coming to my mind: the `client.id` has actually different purpose 
and should not be unique per `KafkaStreams` instance, but should be the _same_ 
for all instances (the name is a little bit mis-leading). For example, if you 
configure quotas, it's based on `client.id` and you usually want quotas to be 
set per application, not per instance.

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15190) Allow configuring a streams process ID

2023-07-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744288#comment-17744288
 ] 

Matthias J. Sax edited comment on KAFKA-15190 at 7/18/23 6:22 PM:
--

One more thing: the `process.id` is actually only used as part of the 
`client.id` iff not `client.id` config is set. – Hence, setting the `client.id` 
should avoid the issue of task shuffling (and the rebalance in itself should 
not be an issue, as it's cheap)?


was (Author: mjsax):
One more thing: the `process.id` is actually only used as part of the 
`client.id` iff not `client.id` config is set. – Hence, setting the `client.id` 
should avoid the issue of rebalancing (and task shuffling)?

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


<    2   3   4   5   6   7   8   9   10   11   >