Hi Claudia,

Looking at your code you have a cyclic relationship. In other words you are
using the same topic for your input and output. If your goal is to always
get the most up to date BoxInfo it will not work correctly. The KStream to
KTable join matches records from the stream with records from the table
that are older. There are some ways to work around this using the processor
API. Let me know if you want more information about that.

You can do a KTable to KTable join on a foreign key, but that triggers from
both sides so that won't work for this situation.

When my team needs to do this we either write the same record to two
topics. One with the "primary key" as the topic key and one with the
"foreign key" as the topic key. That will only work in very specific
circumstances.

We've also created topics that just have the key values (foreign key is
topic key and primary key as value) then did a KStream to KTable join ->
map -> through. The through method has been deprecated and replaced by
repartition.

I think there is definitely a way to do this using some "index" topics and
the streams API.

Hope this helps.
Chad

On Mon, Nov 15, 2021 at 7:06 AM Claudia Kesslau <c.kess...@kasasi.de> wrote:

> Hi Kafka community,
>
> I'm new to using KTable functionality in KafkaStreams and I'm struggling
> with updating KTable values by a non-key field.
>
> I've got a stream of BoxData that I want to enrich with BoxInfo objects.
> For those BoxInfo objects I created a compacted topic `boxInfo` and read
> this topic as a KTable. As both the stream of BoxData and the table of
> BoxInfo are already keyed on the same ID joining them works fine.
>
> Updating BoxInfo objects based on their ID also works fine. I write events
> changing BoxInfo objects by their key to a topic, read this topic as a
> stream, join this stream with the boxInfo KTable and write the adapted
> BoxInfo back to the compacted topic. See this in Code Block 1.
> From those change events I can only extract the ID of the changed BoxInfo
> object not the changes themselves. Therefore, I just set an invalid flag
> instead of writing the changes directly. I now this is not ideal.
>
> The actual question now is, how can I do such an invalidation if I have to
> set all BoxInfo objects to invalid matching another field than the key
> itself?
>
> I came up with 3 possible solutions, but none of them seems ideal:
>
> Possibility 1: primitive solution
> For every event indicating changed BoxData by some field, just read the
> whole BoxInfo KTable and mark matching entries. This does not seem very
> smart or scalable.
>
> Possibility 2: KTable solution based on the ideas from above
> This is the same solution as above but with re-keying the BoxInfo KTable
> first. See Code Block 2.
> Storing a list as a store value seems to be quite resource intensive and
> therefore not ideal.
>
> Possibility 3: KStreams solution
> To avoid having a store with key and a list of objects like in solution 2,
> I could read the boxInfo topic as a stream and join with the invalidation
> stream. See Code Block 3.
> This solution won't work this way because with stream-stream-join both
> sides of the join would trigger processing. Is there a way to block updates
> from one side to trigger a join?
>
> Are there any other possibilities that I did not come up with? This seems
> like quite a common scenario but I could not find any solutions on the
> internet and all solutions listed here do not seem good ones.
>
> Thanks for your feedback.
>
> Best,
> Claudia Kesslau
>
>
>
> Code Block 1:
>
> _______________________________________________________________________________________________________
>
> KTable<String, BoxInfo> boxInfos = builder.table(boxInfoTableName...);
> KStream<String, String> invalidationStream = builder.stream(...);
>
> invalidationStream
>             .leftJoin(boxInfos, (aVoid, boxInfo) -> boxInfo)
>             .filter((boxId, boxInfo) -> nonNull(boxInfo))
>             .mapValues(boxInfo -> boxInfo.setInvalid(true))
>             .to(boxInfoTableName, ...));
>
> _______________________________________________________________________________________________________
>
>
> Code Block 2:
>
> _______________________________________________________________________________________________________
>
> KTable<String, BoxInfo> boxInfos = builder.table(boxInfoTableName, ...);
> KStream<Integer, String> invalidationStream = builder.stream(...);
>
> KTable<Integer, List<BoxInfo>> boxInfosByPortalId = boxInfos
>     .toStream()
>     .flatMap(this::flatMapByPortalId,
> Named.as("Stream-BoxInfosByPortalId"))
>     .groupBy((portalId, boxInfo) -> portalId)
>     .aggregate(ArrayList::new, (portalId, boxInfo, boxInfoList) -> {
>         boxInfoList.add(boxInfo);
>         return boxInfoList;
>     }, Materialized.with(Serdes.Integer(), new BoxInfoListSerde()));
>
> invalidationStream
>             .leftJoin(boxInfosByPortalId, (aVoid, boxInfo) -> boxInfo)
>             .flatMap(this::flatMapByBoxId, Named.as("Stream-BoxInfoById"))
>             .mapValues(boxInfo -> boxInfo.setInvalid(true))
>             .to(boxInfoTableName, ...));
>
>
> List<KeyValue<Integer, BoxInfo>> flatMapByPortalId(String boxId, BoxInfo
> boxInfo) {
>     if (null == boxInfo) {
>         return List.of();
>     }
>
>     return boxInfo.getPortals()
>         .stream()
>         .map(portalId -> KeyValue.pair(portalId, boxInfo))
>         .toList();
> }
>
>
> List<KeyValue<String, BoxInfo>> flatMapByBoxId(Integer portalId,
> List<BoxInfo> boxInfoList) {
>     return boxInfoList.stream()
>         .filter(Objects::nonNull)
>         .map(boxInfo -> KeyValue.pair(boxInfo.getId(), boxInfo))
>         .toList();
> }
>
> _______________________________________________________________________________________________________
>
>
> Code Block 3:
>
> _______________________________________________________________________________________________________
>
> KTable<String, BoxInfo> boxInfos = builder.table(boxInfoTableName, ...);
> KStream<Integer, String> invalidationStream = builder.stream(...);
>
> KStream<Integer, BoxInfo> boxInfosByPortalId = boxInfos
>     .toStream()
>     .flatMap(this::flatMapByPortalId,
> Named.as("Stream-BoxInfosByPortalId"));
>
> invalidationStream
>             .leftJoin(boxInfosByPortalId, (aVoid, boxInfo) -> boxInfo,
> JoinWindows.of(ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(),
> new BoxInfoSerde()))
>             .filter((portalId, boxInfo) -> nonNull(boxInfo))
>             .selectKey((portalId, boxInfo) -> boxInfo.getId())
>             .mapValues(boxInfo -> boxInfo.setInvalid(true))
>             .to(boxInfoTableName, ...));
>
> List<KeyValue<Integer, BoxInfo>> flatMapByPortalId(String boxId, BoxInfo
> boxInfo) {
>     if (null == boxInfo) {
>         return List.of();
>     }
>
>     return boxInfo.getPortals()
>         .stream()
>         .map(portalId -> KeyValue.pair(portalId, boxInfo))
>         .toList();
> }
>
> _______________________________________________________________________________________________________
>

Reply via email to