Hi,
I'm trying to implement the event sourcing pattern with Kafka Streams. I have 
two topics: commands and events.
Commands are modelled as KStream. To get the current state for a given entity, 
I create a KTable by aggregating (replaying) all the events.

Then I join commands KStream with snapshots KTable. As a result of this join, 
I'm executing the command on the latest snapshot, which in turn produces new 
events to events topic.

Here is the code:
    KStream<String, SecurityCommand> securityCommands =
            builder.stream(
                      "security-command",
                      Consumed.with(Serdes.String(), 
JsonSerdes.securityCommand()));

    var userAccountSnapshots =
            builder.stream(
                    "security-event",
                    Consumed.with(Serdes.String(), 
JsonSerdes.userAccountEvent())
                            .withTimestampExtractor(new 
LatestRecordTimestampExtractor()))
                    .groupByKey()
                    .aggregate(
                            () -> UserAccountSnapshot.initial(),
                            (email, event, snapshot) ->
                                    (new UserAccount(snapshot, new 
UserAccountEvent[] { event })).snapshot(),
                            Materialized.with(
                                            Serdes.String(),
                                            JsonSerdes.userAccountSnapshot()));

    Joined<String, SecurityCommand, UserAccountSnapshot> joinParams =
            Joined.with(Serdes.String(),
                        JsonSerdes.securityCommand(),
                        JsonSerdes.userAccountSnapshot());

    ValueJoiner<SecurityCommand, UserAccountSnapshot, 
CommandWithUserAccountSnapshot> commandWithSnapshot =
            (command, snapshot) -> new CommandWithUserAccountSnapshot(command, 
snapshot);

    securityCommands.leftJoin(userAccountSnapshots, commandWithSnapshot, 
joinParams)
                    .flatMapValues((email, cmd) -> processCommand(cmd))
                    .to("security-event",
                                    Produced.with(
                                            Serdes.String(),
                                            JsonSerdes.userAccountEvent()));

The problem that I'm facing is that when I have multiple unprocessed commands 
(CmdA, CmdB) in the commands topic, after processing the first one, the event 
produced after processing this command doesn't get fetched from events topic. 
As a result, the next command is processed on an obsolete snapshot.

The reason for this is the KIP-695 implementation.

After CmdA is processed, the consumer lag for events topic doesn't get updated, 
because no poll is done. The cached lag is 0. Therefore, CmdB is processed 
without fetching data from events topic. This is a blocker.

Basically, it means that if a have a topology where I'm joining two streams and 
one of the streams is sourced from a topic that also serves as a sink topic, I 
will get inconsistent behaviour due to stale data.
Is there a workaround to my problem?
I raised the same question here Stream-KTable join - stale data if same topic 
is used as source and sink

Regards,  Max.

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
Stream-KTable join - stale data if same topic is used as source and sink

I'm trying to implement the event sourcing pattern with Kafka Streams. I have 
two topics: commands and events.C...
 |

 |

 |




Reply via email to