Hi, I'm trying to implement the event sourcing pattern with Kafka Streams. I have two topics: commands and events. Commands are modelled as KStream. To get the current state for a given entity, I create a KTable by aggregating (replaying) all the events.
Then I join commands KStream with snapshots KTable. As a result of this join, I'm executing the command on the latest snapshot, which in turn produces new events to events topic. Here is the code: KStream<String, SecurityCommand> securityCommands = builder.stream( "security-command", Consumed.with(Serdes.String(), JsonSerdes.securityCommand())); var userAccountSnapshots = builder.stream( "security-event", Consumed.with(Serdes.String(), JsonSerdes.userAccountEvent()) .withTimestampExtractor(new LatestRecordTimestampExtractor())) .groupByKey() .aggregate( () -> UserAccountSnapshot.initial(), (email, event, snapshot) -> (new UserAccount(snapshot, new UserAccountEvent[] { event })).snapshot(), Materialized.with( Serdes.String(), JsonSerdes.userAccountSnapshot())); Joined<String, SecurityCommand, UserAccountSnapshot> joinParams = Joined.with(Serdes.String(), JsonSerdes.securityCommand(), JsonSerdes.userAccountSnapshot()); ValueJoiner<SecurityCommand, UserAccountSnapshot, CommandWithUserAccountSnapshot> commandWithSnapshot = (command, snapshot) -> new CommandWithUserAccountSnapshot(command, snapshot); securityCommands.leftJoin(userAccountSnapshots, commandWithSnapshot, joinParams) .flatMapValues((email, cmd) -> processCommand(cmd)) .to("security-event", Produced.with( Serdes.String(), JsonSerdes.userAccountEvent())); The problem that I'm facing is that when I have multiple unprocessed commands (CmdA, CmdB) in the commands topic, after processing the first one, the event produced after processing this command doesn't get fetched from events topic. As a result, the next command is processed on an obsolete snapshot. The reason for this is the KIP-695 implementation. After CmdA is processed, the consumer lag for events topic doesn't get updated, because no poll is done. The cached lag is 0. Therefore, CmdB is processed without fetching data from events topic. This is a blocker. Basically, it means that if a have a topology where I'm joining two streams and one of the streams is sourced from a topic that also serves as a sink topic, I will get inconsistent behaviour due to stale data. Is there a workaround to my problem? I raised the same question here Stream-KTable join - stale data if same topic is used as source and sink Regards, Max. | | | | | | | | | | | Stream-KTable join - stale data if same topic is used as source and sink I'm trying to implement the event sourcing pattern with Kafka Streams. I have two topics: commands and events.C... | | |