Your update to the KTable is async when you send data back to the KTable
input topic. So your program is subject to race-conditions.
So switching to the PAPI was the right move: it make the update to the
state store sync and thus fixes the issue.
-Matthias
On 9/4/23 5:53 AM, Mauricio Lopez wrote:
Hello,
They were getting processed by the same consumer as we only had a single
machine running this.
What we ended up doing is basically drawing the same topology but interacting
directly with the stateStore using the Processor API instead of DSL. Seems that
fixed everything up (and made it way quicker).
Best,
Mauricio
On 2023/08/28 12:04:12 Claudia Kesslau wrote:
Hi,
I'm definitly no expert, but to me it sounds as not all your messages are
getting processed by the same consumer. Are you using the key `foo` for
partitioning? Is `baz` actually another key or is this mixup in your example
and `baz` is another value with key `foo`?
Hope you find a solution to your problem.
Best,
Claudia
________________________________
Von: Mauricio Lopez <ml...@silversky.com<mailto:ml...@silversky.com>>
Gesendet: Donnerstag, 17. August 2023 22:57
An: users@kafka.apache.org<mailto:users@kafka.apache.org>
<us...@kafka.apache.org<mailto:us...@kafka.apache.org>>
Betreff: Table updates are not consistent when doing a join with a Stream
Hello Folks,
We are having an issue with a Kafka Streams Java application.
We have a KStream and a KTable which are joined using a Left Join. The entries
in the KTable are constantly updated by the new information that comes from the
KStream. Each KStream message is adding entries to an array that the KTable has
for each key. This update gets sent back to the KTable topic, expanding this
array every time a new message comes from the KStream.
As an example, what should be happening (and what happens in our unit tests) is:
* KTable has an empty array for key “foo”: []
* Event 1 comes with key “foo” and value “bar”
* Ktable gets updated to “foo”: [“bar”] , sending this update´to´ the same
topic that the KTable is plugged into.
* Event 2 comes with key “baz”
* Update is pulled to mem by Ktable, and the Ktable gets updated to “foo”:
[“bar, “baz”], sending this change ´to´ the same topic that the KTable is
plugged into. Baz was appended to the array for key “foo”.
But what is happening is the following:
* KTable has an empty array for key “foo”: []
* Event 1 comes with key “foo” and value “bar”
* Ktable gets updated to “foo”: [“bar”] in the joiner, sending an event
´to´ the same topic that the KTable is plugged to.
* Event 2 comes with key “baz”
* Ktable gets updated to “foo”: [“baz”] in the joiner, sending an event
´to´ the same topic that the KTable is plugged to afterwards.
This happens multiple times, and after a couple of seconds, one of the incoming
messages is finally appended, but many of them are lost. As you can see, we
suspect that when the Event 2 is received, the KTable has somehow not received
the first update for adding “baz” to the array.
This means that many events are missed, and we cannot successfully get the
KTable to save all the data for all the events. In turn, it sometimes
overwrites the updates from some events.
So far, we have tried:
* Setting STATESTORE_CACHE_MAX_BYTES_CONFIG to 0, to attempt to force the
app not to cache any changes and send to the output topic instantly.
* Setting COMMIT_INTERVAL_MS_CONFIG to 0, to attempt to force the app to
send all updates instantly
* Setting TOPOLOGY_OPTIMIZATION_CONFIG to “reuse.ktable.source.topics” and
“all” in case there is some optimization pattern that could help us.
None of these have allowed us to have a fully consistent update of the KTable
each time a new event comes. It always gets overwritten or misses incoming
updates made by events. Can someone advice if there’s a way to make the KTable
get successfully updated by each one of the events, as the first example shows?
Thanks,
Mauricio L
------------------------------------------------------------------------------------
This message is for the sole use of the intended recipient(s) and may contain
confidential and/or privileged information of SilverSky. Any unauthorized
review, use, copying, disclosure, or distribution is prohibited. If you are not
the intended recipient, please immediately contact the sender by reply email
and delete all copies of the original message.
Mauricio López S.
Software Engineer