Hi
I am trying to understand why a KTable to KTable left join is being called
twice when I receive a message on the right table.
Here is my Topology:

Serde<Author> authorSerde = ...
Serde<Set<Book>> bSetSerde = ...
Serde<Set<AutorPublisherAssociation>> apSetSerde = ...
KTable<String, Author> authorTable = builder.table(AUTHOR_TOPIC,
Consumed.with(Serdes.String(), authorSerde));
KTable<String, Set<Book>> booksByAuthorTable =
builder.table(BOOKS_BY_AUTHOR,
Consumed.with(Serdes.String(), bSetSerde));
KTable<String, Set<AutorPublisherAssociation>> apTable =
builder.table(PUBLISHER_ASSOCIATIONS_BY_AUTHOR,
Consumed.with(Serdes.String(), apSetSerde));
KTable<String, Author> enrichedAuthorTable = authorTable.leftJoin(apTable,
(a,apSet) -> {
if (apSet == null) {
a.setPublishers(new HashSet<>());
} else {
a.setPublishers(apSet.stream().map(ap ->
ap.getPublisher()).collect(Collectors.toSet()));
}
return a;
}).leftJoin(booksByAuthorTable, (a, b)-> {
a.setBooks(b);
return a;
});
enrichedAuthorTable.toStream().to(ENRICHED_AUTHORS,
Produced.with(Serdes.String(),authorSerde));

Note I have 3 topics, all of them keyed by Author:
- AUTHOR_TOPIC is keyed by the authorKey and has the Author message;
- BOOKS_BY_AUTHOR is keyed by authorKey and has a Set of Books;
- PUBLISHER_ASSOCIATIONS_BY_AUTHOR is keyed by authorKey and has a set of
AutorPublisherAssociation (this is a Pojo that links one author to one
publisher);

Also note that the IF is intended to avoid NPEs, and also to deal with
tombstones, where if I want to delete the list of publishers associated to
an author, an tombstone to the PUBLISHER_ASSOCIATIONS topics would override
the list of Publishers on the Author.

In my simple testcase, I am not sending any updates, just one message on
each topic, on this order: author, booksByAuthor, publisherByAuthor.
When author arrives, both ValueJoiners are called with author message and
null for the right table.
When a set of books arrive, both joins will be called ONCE, the first
joiner receives an author and null, the second joiner receives an author
and the set of books.
The problem comes next:
When the set of  AutorPublisherAssociation arrives, the first ValueJoiner
is called TWICE, one with author and apSet, and the second time it's called
with author and null.

I don't understand why in this scenario the ValueJoiner is called twice,
with a null instead of the message at last, overriding the correct value.

Thanks
Murilo

Reply via email to