Hi Kafka community,
I'm new to using KTable functionality in KafkaStreams and I'm struggling with
updating KTable values by a non-key field.
I've got a stream of BoxData that I want to enrich with BoxInfo objects. For
those BoxInfo objects I created a compacted topic `boxInfo` and read this topic
as a KTable. As both the stream of BoxData and the table of BoxInfo are already
keyed on the same ID joining them works fine.
Updating BoxInfo objects based on their ID also works fine. I write events
changing BoxInfo objects by their key to a topic, read this topic as a stream,
join this stream with the boxInfo KTable and write the adapted BoxInfo back to
the compacted topic. See this in Code Block 1.
From those change events I can only extract the ID of the changed BoxInfo
object not the changes themselves. Therefore, I just set an invalid flag
instead of writing the changes directly. I now this is not ideal.
The actual question now is, how can I do such an invalidation if I have to set
all BoxInfo objects to invalid matching another field than the key itself?
I came up with 3 possible solutions, but none of them seems ideal:
Possibility 1: primitive solution
For every event indicating changed BoxData by some field, just read the whole
BoxInfo KTable and mark matching entries. This does not seem very smart or
scalable.
Possibility 2: KTable solution based on the ideas from above
This is the same solution as above but with re-keying the BoxInfo KTable first.
See Code Block 2.
Storing a list as a store value seems to be quite resource intensive and
therefore not ideal.
Possibility 3: KStreams solution
To avoid having a store with key and a list of objects like in solution 2, I
could read the boxInfo topic as a stream and join with the invalidation stream.
See Code Block 3.
This solution won't work this way because with stream-stream-join both sides of
the join would trigger processing. Is there a way to block updates from one
side to trigger a join?
Are there any other possibilities that I did not come up with? This seems like
quite a common scenario but I could not find any solutions on the internet and
all solutions listed here do not seem good ones.
Thanks for your feedback.
Best,
Claudia Kesslau
Code Block 1:
_______________________________________________________________________________________________________
KTable<String, BoxInfo> boxInfos = builder.table(boxInfoTableName...);
KStream<String, String> invalidationStream = builder.stream(...);
invalidationStream
.leftJoin(boxInfos, (aVoid, boxInfo) -> boxInfo)
.filter((boxId, boxInfo) -> nonNull(boxInfo))
.mapValues(boxInfo -> boxInfo.setInvalid(true))
.to(boxInfoTableName, ...));
_______________________________________________________________________________________________________
Code Block 2:
_______________________________________________________________________________________________________
KTable<String, BoxInfo> boxInfos = builder.table(boxInfoTableName, ...);
KStream<Integer, String> invalidationStream = builder.stream(...);
KTable<Integer, List<BoxInfo>> boxInfosByPortalId = boxInfos
.toStream()
.flatMap(this::flatMapByPortalId, Named.as("Stream-BoxInfosByPortalId"))
.groupBy((portalId, boxInfo) -> portalId)
.aggregate(ArrayList::new, (portalId, boxInfo, boxInfoList) -> {
boxInfoList.add(boxInfo);
return boxInfoList;
}, Materialized.with(Serdes.Integer(), new BoxInfoListSerde()));
invalidationStream
.leftJoin(boxInfosByPortalId, (aVoid, boxInfo) -> boxInfo)
.flatMap(this::flatMapByBoxId, Named.as("Stream-BoxInfoById"))
.mapValues(boxInfo -> boxInfo.setInvalid(true))
.to(boxInfoTableName, ...));
List<KeyValue<Integer, BoxInfo>> flatMapByPortalId(String boxId, BoxInfo
boxInfo) {
if (null == boxInfo) {
return List.of();
}
return boxInfo.getPortals()
.stream()
.map(portalId -> KeyValue.pair(portalId, boxInfo))
.toList();
}
List<KeyValue<String, BoxInfo>> flatMapByBoxId(Integer portalId, List<BoxInfo>
boxInfoList) {
return boxInfoList.stream()
.filter(Objects::nonNull)
.map(boxInfo -> KeyValue.pair(boxInfo.getId(), boxInfo))
.toList();
}
_______________________________________________________________________________________________________
Code Block 3:
_______________________________________________________________________________________________________
KTable<String, BoxInfo> boxInfos = builder.table(boxInfoTableName, ...);
KStream<Integer, String> invalidationStream = builder.stream(...);
KStream<Integer, BoxInfo> boxInfosByPortalId = boxInfos
.toStream()
.flatMap(this::flatMapByPortalId, Named.as("Stream-BoxInfosByPortalId"));
invalidationStream
.leftJoin(boxInfosByPortalId, (aVoid, boxInfo) -> boxInfo,
JoinWindows.of(ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), new
BoxInfoSerde()))
.filter((portalId, boxInfo) -> nonNull(boxInfo))
.selectKey((portalId, boxInfo) -> boxInfo.getId())
.mapValues(boxInfo -> boxInfo.setInvalid(true))
.to(boxInfoTableName, ...));
List<KeyValue<Integer, BoxInfo>> flatMapByPortalId(String boxId, BoxInfo
boxInfo) {
if (null == boxInfo) {
return List.of();
}
return boxInfo.getPortals()
.stream()
.map(portalId -> KeyValue.pair(portalId, boxInfo))
.toList();
}
_______________________________________________________________________________________________________