Re: Messages disappearing from Kafka Streams topology
Hi Mangat, back to work now. I've configured out Streams applications to use exacly-once semantics, but to no avail. Actually, after some. more investigation I've come to suspect that the issue is somehow related to rebalancing. The initially shown topology lives inside a Quarkus Kafka Streams application that is rolled out as part of a K8s StatefulSet with three replicas. When initially fired up, the StatefulSet controller starts the Pods sequentially in order, each subsequently triggering a rebalance operation. Topics then show the mentioned message loss. As soon as I reduce the StatefulSet to only one replica, the changelog topics as well as the eventual output topic show the expected message count. So - can the issue somehow be related to rebalancing? Best wishes Karsten Am Do., 28. März 2024 um 08:25 Uhr schrieb Karsten Stöckmann : > > Hi Mangat, > > thank you for clarification. I'll try the suggested configuration as soon as > I get back to work. Will keep you posted then. > > Best wishes > Karsten > > > mangat rai schrieb am Mi., 27. März 2024, 11:07: >> >> Hey Karsten, >> >> You don't need to do any other configuration to enable EOS. See here - >> https://docs.confluent.io/platform/current/streams/concepts.html#processing-guarantees >> It mentions that the producer will be idempotent. That also mans ack=all >> will be considered. Not that if you have any other ack from the config, it >> will be ignored in the favour of exactly-once. >> >> Do let me know if that solves your problem. I am curious. if yes, then I >> would ask you to create an issue. >> >> Regards, >> Mangat >> >> On Wed, Mar 27, 2024 at 10:49 AM Karsten Stöckmann < >> karsten.stoeckm...@gmail.com> wrote: >> >> > Hi Mangat, >> > >> > thanks for clarification. So to my knowledge exactly-once is configured >> > using the 'processing.guarantee=exactly_once_v2' setting? Is the >> > configuration setting 'acks=all' somehow related and would you advise >> > setting that as well? >> > >> > Best wishes >> > Karsten >> > >> > >> > mangat rai schrieb am Di., 26. März 2024, 15:44: >> > >> > > Hey Karsten, >> > > >> > > So if a topic has not been created yet. Streams app will keep the data in >> > > memory, and then write it later when it is available. if your app is >> > > restarted (or thread is killed), you may lose data but it depends if the >> > > app will commit in the source topics. If there is no errors, then it >> > should >> > > be persisted eventually. >> > > >> > > However, overall exactly-once provides a much tighter and better commit >> > > control. If you don't have scaling issue, I will strongly advise you to >> > use >> > > EOS. >> > > >> > > Thanks, >> > > Mangat >> > > >> > > >> > > On Tue, Mar 26, 2024 at 3:33 PM Karsten Stöckmann < >> > > karsten.stoeckm...@gmail.com> wrote: >> > > >> > > > Hi Mangat, >> > > > >> > > > thanks for your thoughts. I had actually considered exactly-once >> > > semantics >> > > > already, was unsure whether it would help, and left it aside for once >> > > then. >> > > > I'll try that immediately when I get back to work. >> > > > >> > > > About snapshots and deserialization - I doubt that the issue is caused >> > by >> > > > deserialization failures because: when taking another (i.e. at a later >> > > > point of time) snapshot of the exact same data, all messages fed into >> > the >> > > > input topic pass the pipeline as expected. >> > > > >> > > > Logs of both Kafka and Kafka Streams show no signs of notable issues as >> > > far >> > > > as I can tell, apart from these (when initially starting up, >> > intermediate >> > > > topics not existing yet): >> > > > >> > > > 2024-03-22 22:36:11,386 WARN [org.apa.kaf.cli.NetworkClient] >> > > > >> > > > >> > > >> > (kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4) >> > > > [Consumer >> > > > >> > > > >> > > >> > clientId=kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4-consumer, >> > > > groupId=kstreams-folder-aggregator] Error while fetching metadata with >> > > > correlation id 69 : >> > > > >> > > > >> > > >> > {kstreams-folder-aggregator-folder-to-agency-subscription-response-topic=UNKNOWN_TOPIC_OR_PARTITION, >> > > > } >> > > > >> > > > Best wishes >> > > > Karsten >> > > > >> > > > >> > > > >> > > > mangat rai schrieb am Di., 26. März 2024, >> > 11:06: >> > > > >> > > > > Hey Karsten, >> > > > > >> > > > > There could be several reasons this could happen. >> > > > > 1. Did you check the error logs? There are several reasons why the >> > > Kafka >> > > > > stream app may drop incoming messages. Use exactly-once semantics to >> > > > limit >> > > > > such cases. >> > > > > 2. Are you sure there was no error when deserializing the records >> > from >> > > > > `folderTopicName`. You mentioned that it happens only when you start >> > > > > processing and the other table snapshot works fine. This gives me a >> > > > feeling >> > > > > that the first records in the topic might not be de
Re: Messages disappearing from Kafka Streams topology
Hi Mangat, thank you for clarification. I'll try the suggested configuration as soon as I get back to work. Will keep you posted then. Best wishes Karsten mangat rai schrieb am Mi., 27. März 2024, 11:07: > Hey Karsten, > > You don't need to do any other configuration to enable EOS. See here - > > https://docs.confluent.io/platform/current/streams/concepts.html#processing-guarantees > It mentions that the producer will be idempotent. That also mans ack=all > will be considered. Not that if you have any other ack from the config, it > will be ignored in the favour of exactly-once. > > Do let me know if that solves your problem. I am curious. if yes, then I > would ask you to create an issue. > > Regards, > Mangat > > On Wed, Mar 27, 2024 at 10:49 AM Karsten Stöckmann < > karsten.stoeckm...@gmail.com> wrote: > > > Hi Mangat, > > > > thanks for clarification. So to my knowledge exactly-once is configured > > using the 'processing.guarantee=exactly_once_v2' setting? Is the > > configuration setting 'acks=all' somehow related and would you advise > > setting that as well? > > > > Best wishes > > Karsten > > > > > > mangat rai schrieb am Di., 26. März 2024, 15:44: > > > > > Hey Karsten, > > > > > > So if a topic has not been created yet. Streams app will keep the data > in > > > memory, and then write it later when it is available. if your app is > > > restarted (or thread is killed), you may lose data but it depends if > the > > > app will commit in the source topics. If there is no errors, then it > > should > > > be persisted eventually. > > > > > > However, overall exactly-once provides a much tighter and better commit > > > control. If you don't have scaling issue, I will strongly advise you to > > use > > > EOS. > > > > > > Thanks, > > > Mangat > > > > > > > > > On Tue, Mar 26, 2024 at 3:33 PM Karsten Stöckmann < > > > karsten.stoeckm...@gmail.com> wrote: > > > > > > > Hi Mangat, > > > > > > > > thanks for your thoughts. I had actually considered exactly-once > > > semantics > > > > already, was unsure whether it would help, and left it aside for once > > > then. > > > > I'll try that immediately when I get back to work. > > > > > > > > About snapshots and deserialization - I doubt that the issue is > caused > > by > > > > deserialization failures because: when taking another (i.e. at a > later > > > > point of time) snapshot of the exact same data, all messages fed into > > the > > > > input topic pass the pipeline as expected. > > > > > > > > Logs of both Kafka and Kafka Streams show no signs of notable issues > as > > > far > > > > as I can tell, apart from these (when initially starting up, > > intermediate > > > > topics not existing yet): > > > > > > > > 2024-03-22 22:36:11,386 WARN [org.apa.kaf.cli.NetworkClient] > > > > > > > > > > > > > > (kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4) > > > > [Consumer > > > > > > > > > > > > > > clientId=kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4-consumer, > > > > groupId=kstreams-folder-aggregator] Error while fetching metadata > with > > > > correlation id 69 : > > > > > > > > > > > > > > {kstreams-folder-aggregator-folder-to-agency-subscription-response-topic=UNKNOWN_TOPIC_OR_PARTITION, > > > > } > > > > > > > > Best wishes > > > > Karsten > > > > > > > > > > > > > > > > mangat rai schrieb am Di., 26. März 2024, > > 11:06: > > > > > > > > > Hey Karsten, > > > > > > > > > > There could be several reasons this could happen. > > > > > 1. Did you check the error logs? There are several reasons why the > > > Kafka > > > > > stream app may drop incoming messages. Use exactly-once semantics > to > > > > limit > > > > > such cases. > > > > > 2. Are you sure there was no error when deserializing the records > > from > > > > > `folderTopicName`. You mentioned that it happens only when you > start > > > > > processing and the other table snapshot works fine. This gives me a > > > > feeling > > > > > that the first records in the topic might not be deserialized > > properly. > > > > > > > > > > Regards, > > > > > Mangat > > > > > > > > > > On Tue, Mar 26, 2024 at 8:45 AM Karsten Stöckmann < > > > > > karsten.stoeckm...@gmail.com> wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > thanks for getting back. I'll try and illustrate the issue. > > > > > > > > > > > > I've got an input topic 'folderTopicName' fed by a database CDC > > > system. > > > > > > Messages then pass a series of FK left joins and are eventually > > sent > > > to > > > > > an > > > > > > output topic like this ('agencies' and 'documents' being > KTables): > > > > > > > > > > > > > > > > > > streamsBuilder // > > > > > > .table( // > > > > > > folderTopicName, // > > > > > > Consumed.with( // > > > > > > folderKeySerde, // > > > > > > folderSerde)) // > > > > > >
Re: Messages disappearing from Kafka Streams topology
Hey Karsten, You don't need to do any other configuration to enable EOS. See here - https://docs.confluent.io/platform/current/streams/concepts.html#processing-guarantees It mentions that the producer will be idempotent. That also mans ack=all will be considered. Not that if you have any other ack from the config, it will be ignored in the favour of exactly-once. Do let me know if that solves your problem. I am curious. if yes, then I would ask you to create an issue. Regards, Mangat On Wed, Mar 27, 2024 at 10:49 AM Karsten Stöckmann < karsten.stoeckm...@gmail.com> wrote: > Hi Mangat, > > thanks for clarification. So to my knowledge exactly-once is configured > using the 'processing.guarantee=exactly_once_v2' setting? Is the > configuration setting 'acks=all' somehow related and would you advise > setting that as well? > > Best wishes > Karsten > > > mangat rai schrieb am Di., 26. März 2024, 15:44: > > > Hey Karsten, > > > > So if a topic has not been created yet. Streams app will keep the data in > > memory, and then write it later when it is available. if your app is > > restarted (or thread is killed), you may lose data but it depends if the > > app will commit in the source topics. If there is no errors, then it > should > > be persisted eventually. > > > > However, overall exactly-once provides a much tighter and better commit > > control. If you don't have scaling issue, I will strongly advise you to > use > > EOS. > > > > Thanks, > > Mangat > > > > > > On Tue, Mar 26, 2024 at 3:33 PM Karsten Stöckmann < > > karsten.stoeckm...@gmail.com> wrote: > > > > > Hi Mangat, > > > > > > thanks for your thoughts. I had actually considered exactly-once > > semantics > > > already, was unsure whether it would help, and left it aside for once > > then. > > > I'll try that immediately when I get back to work. > > > > > > About snapshots and deserialization - I doubt that the issue is caused > by > > > deserialization failures because: when taking another (i.e. at a later > > > point of time) snapshot of the exact same data, all messages fed into > the > > > input topic pass the pipeline as expected. > > > > > > Logs of both Kafka and Kafka Streams show no signs of notable issues as > > far > > > as I can tell, apart from these (when initially starting up, > intermediate > > > topics not existing yet): > > > > > > 2024-03-22 22:36:11,386 WARN [org.apa.kaf.cli.NetworkClient] > > > > > > > > > (kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4) > > > [Consumer > > > > > > > > > clientId=kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4-consumer, > > > groupId=kstreams-folder-aggregator] Error while fetching metadata with > > > correlation id 69 : > > > > > > > > > {kstreams-folder-aggregator-folder-to-agency-subscription-response-topic=UNKNOWN_TOPIC_OR_PARTITION, > > > } > > > > > > Best wishes > > > Karsten > > > > > > > > > > > > mangat rai schrieb am Di., 26. März 2024, > 11:06: > > > > > > > Hey Karsten, > > > > > > > > There could be several reasons this could happen. > > > > 1. Did you check the error logs? There are several reasons why the > > Kafka > > > > stream app may drop incoming messages. Use exactly-once semantics to > > > limit > > > > such cases. > > > > 2. Are you sure there was no error when deserializing the records > from > > > > `folderTopicName`. You mentioned that it happens only when you start > > > > processing and the other table snapshot works fine. This gives me a > > > feeling > > > > that the first records in the topic might not be deserialized > properly. > > > > > > > > Regards, > > > > Mangat > > > > > > > > On Tue, Mar 26, 2024 at 8:45 AM Karsten Stöckmann < > > > > karsten.stoeckm...@gmail.com> wrote: > > > > > > > > > Hi, > > > > > > > > > > thanks for getting back. I'll try and illustrate the issue. > > > > > > > > > > I've got an input topic 'folderTopicName' fed by a database CDC > > system. > > > > > Messages then pass a series of FK left joins and are eventually > sent > > to > > > > an > > > > > output topic like this ('agencies' and 'documents' being KTables): > > > > > > > > > > > > > > > streamsBuilder // > > > > > .table( // > > > > > folderTopicName, // > > > > > Consumed.with( // > > > > > folderKeySerde, // > > > > > folderSerde)) // > > > > > .leftJoin( // > > > > > agencies, // > > > > > Folder::agencyIdValue, // > > > > > AggregateFolder::new, // > > > > > TableJoined.as("folder-to-agency"), // > > > > > Materializer // > > > > > . > > > > AggregateFolder>named("folder-to-agency-materialized") // > > > > > .withKeySer
Re: Messages disappearing from Kafka Streams topology
Hi Mangat, thanks for clarification. So to my knowledge exactly-once is configured using the 'processing.guarantee=exactly_once_v2' setting? Is the configuration setting 'acks=all' somehow related and would you advise setting that as well? Best wishes Karsten mangat rai schrieb am Di., 26. März 2024, 15:44: > Hey Karsten, > > So if a topic has not been created yet. Streams app will keep the data in > memory, and then write it later when it is available. if your app is > restarted (or thread is killed), you may lose data but it depends if the > app will commit in the source topics. If there is no errors, then it should > be persisted eventually. > > However, overall exactly-once provides a much tighter and better commit > control. If you don't have scaling issue, I will strongly advise you to use > EOS. > > Thanks, > Mangat > > > On Tue, Mar 26, 2024 at 3:33 PM Karsten Stöckmann < > karsten.stoeckm...@gmail.com> wrote: > > > Hi Mangat, > > > > thanks for your thoughts. I had actually considered exactly-once > semantics > > already, was unsure whether it would help, and left it aside for once > then. > > I'll try that immediately when I get back to work. > > > > About snapshots and deserialization - I doubt that the issue is caused by > > deserialization failures because: when taking another (i.e. at a later > > point of time) snapshot of the exact same data, all messages fed into the > > input topic pass the pipeline as expected. > > > > Logs of both Kafka and Kafka Streams show no signs of notable issues as > far > > as I can tell, apart from these (when initially starting up, intermediate > > topics not existing yet): > > > > 2024-03-22 22:36:11,386 WARN [org.apa.kaf.cli.NetworkClient] > > > > > (kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4) > > [Consumer > > > > > clientId=kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4-consumer, > > groupId=kstreams-folder-aggregator] Error while fetching metadata with > > correlation id 69 : > > > > > {kstreams-folder-aggregator-folder-to-agency-subscription-response-topic=UNKNOWN_TOPIC_OR_PARTITION, > > } > > > > Best wishes > > Karsten > > > > > > > > mangat rai schrieb am Di., 26. März 2024, 11:06: > > > > > Hey Karsten, > > > > > > There could be several reasons this could happen. > > > 1. Did you check the error logs? There are several reasons why the > Kafka > > > stream app may drop incoming messages. Use exactly-once semantics to > > limit > > > such cases. > > > 2. Are you sure there was no error when deserializing the records from > > > `folderTopicName`. You mentioned that it happens only when you start > > > processing and the other table snapshot works fine. This gives me a > > feeling > > > that the first records in the topic might not be deserialized properly. > > > > > > Regards, > > > Mangat > > > > > > On Tue, Mar 26, 2024 at 8:45 AM Karsten Stöckmann < > > > karsten.stoeckm...@gmail.com> wrote: > > > > > > > Hi, > > > > > > > > thanks for getting back. I'll try and illustrate the issue. > > > > > > > > I've got an input topic 'folderTopicName' fed by a database CDC > system. > > > > Messages then pass a series of FK left joins and are eventually sent > to > > > an > > > > output topic like this ('agencies' and 'documents' being KTables): > > > > > > > > > > > > streamsBuilder // > > > > .table( // > > > > folderTopicName, // > > > > Consumed.with( // > > > > folderKeySerde, // > > > > folderSerde)) // > > > > .leftJoin( // > > > > agencies, // > > > > Folder::agencyIdValue, // > > > > AggregateFolder::new, // > > > > TableJoined.as("folder-to-agency"), // > > > > Materializer // > > > > . > > > AggregateFolder>named("folder-to-agency-materialized") // > > > > .withKeySerde(folderKeySerde) // > > > > > > > .withValueSerde(aggregateFolderSerde)) > > > > // > > > > .leftJoin( // > > > > documents, // > > > > .toStream(... > > > > .to(... > > > > > > > > ... > > > > > > > > As far as I understand, left join sematics should be similar to those > > of > > > > relational databases, i.e. the left hand value always passes the join > > > with > > > > the right hand value set as if not present. Whereas what I am > > > > observing is this: lots of messages on the input topic are even > absent > > on > > > > the first left join changelog topic > > > > ('folder-to-agency-materialized-changelog'). But: this seems to > happen > > > only > > > > in case the Streams application is fired up for the first time, i.e. > > > > in
Re: Messages disappearing from Kafka Streams topology
Hey Karsten, So if a topic has not been created yet. Streams app will keep the data in memory, and then write it later when it is available. if your app is restarted (or thread is killed), you may lose data but it depends if the app will commit in the source topics. If there is no errors, then it should be persisted eventually. However, overall exactly-once provides a much tighter and better commit control. If you don't have scaling issue, I will strongly advise you to use EOS. Thanks, Mangat On Tue, Mar 26, 2024 at 3:33 PM Karsten Stöckmann < karsten.stoeckm...@gmail.com> wrote: > Hi Mangat, > > thanks for your thoughts. I had actually considered exactly-once semantics > already, was unsure whether it would help, and left it aside for once then. > I'll try that immediately when I get back to work. > > About snapshots and deserialization - I doubt that the issue is caused by > deserialization failures because: when taking another (i.e. at a later > point of time) snapshot of the exact same data, all messages fed into the > input topic pass the pipeline as expected. > > Logs of both Kafka and Kafka Streams show no signs of notable issues as far > as I can tell, apart from these (when initially starting up, intermediate > topics not existing yet): > > 2024-03-22 22:36:11,386 WARN [org.apa.kaf.cli.NetworkClient] > > (kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4) > [Consumer > > clientId=kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4-consumer, > groupId=kstreams-folder-aggregator] Error while fetching metadata with > correlation id 69 : > > {kstreams-folder-aggregator-folder-to-agency-subscription-response-topic=UNKNOWN_TOPIC_OR_PARTITION, > } > > Best wishes > Karsten > > > > mangat rai schrieb am Di., 26. März 2024, 11:06: > > > Hey Karsten, > > > > There could be several reasons this could happen. > > 1. Did you check the error logs? There are several reasons why the Kafka > > stream app may drop incoming messages. Use exactly-once semantics to > limit > > such cases. > > 2. Are you sure there was no error when deserializing the records from > > `folderTopicName`. You mentioned that it happens only when you start > > processing and the other table snapshot works fine. This gives me a > feeling > > that the first records in the topic might not be deserialized properly. > > > > Regards, > > Mangat > > > > On Tue, Mar 26, 2024 at 8:45 AM Karsten Stöckmann < > > karsten.stoeckm...@gmail.com> wrote: > > > > > Hi, > > > > > > thanks for getting back. I'll try and illustrate the issue. > > > > > > I've got an input topic 'folderTopicName' fed by a database CDC system. > > > Messages then pass a series of FK left joins and are eventually sent to > > an > > > output topic like this ('agencies' and 'documents' being KTables): > > > > > > > > > streamsBuilder // > > > .table( // > > > folderTopicName, // > > > Consumed.with( // > > > folderKeySerde, // > > > folderSerde)) // > > > .leftJoin( // > > > agencies, // > > > Folder::agencyIdValue, // > > > AggregateFolder::new, // > > > TableJoined.as("folder-to-agency"), // > > > Materializer // > > > . > > AggregateFolder>named("folder-to-agency-materialized") // > > > .withKeySerde(folderKeySerde) // > > > > > .withValueSerde(aggregateFolderSerde)) > > > // > > > .leftJoin( // > > > documents, // > > > .toStream(... > > > .to(... > > > > > > ... > > > > > > As far as I understand, left join sematics should be similar to those > of > > > relational databases, i.e. the left hand value always passes the join > > with > > > the right hand value set as if not present. Whereas what I am > > > observing is this: lots of messages on the input topic are even absent > on > > > the first left join changelog topic > > > ('folder-to-agency-materialized-changelog'). But: this seems to happen > > only > > > in case the Streams application is fired up for the first time, i.e. > > > intermediate topics do not yet exist. When streaming another table > > snapshot > > > to the input topic, things seem (!) to work as expected... > > > > > > Best wishes, > > > Karsten > > > > > > Bruno Cadonna schrieb am Mo., 25. März 2024, > 17:01: > > > > > > > Hi, > > > > > > > > That sounds worrisome! > > > > > > > > Could you please provide us with a minimal example that shows the > issue > > > > you describe? > > > > > > > > That would help a lot! > > > > > > > > Best, > > > > Bruno > > > > > > > > On 3/25/24 4:07 PM, Karsten Stöckmann wrote: > > > > > Hi, >
Re: Messages disappearing from Kafka Streams topology
Hi Mangat, thanks for your thoughts. I had actually considered exactly-once semantics already, was unsure whether it would help, and left it aside for once then. I'll try that immediately when I get back to work. About snapshots and deserialization - I doubt that the issue is caused by deserialization failures because: when taking another (i.e. at a later point of time) snapshot of the exact same data, all messages fed into the input topic pass the pipeline as expected. Logs of both Kafka and Kafka Streams show no signs of notable issues as far as I can tell, apart from these (when initially starting up, intermediate topics not existing yet): 2024-03-22 22:36:11,386 WARN [org.apa.kaf.cli.NetworkClient] (kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4) [Consumer clientId=kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4-consumer, groupId=kstreams-folder-aggregator] Error while fetching metadata with correlation id 69 : {kstreams-folder-aggregator-folder-to-agency-subscription-response-topic=UNKNOWN_TOPIC_OR_PARTITION, } Best wishes Karsten mangat rai schrieb am Di., 26. März 2024, 11:06: > Hey Karsten, > > There could be several reasons this could happen. > 1. Did you check the error logs? There are several reasons why the Kafka > stream app may drop incoming messages. Use exactly-once semantics to limit > such cases. > 2. Are you sure there was no error when deserializing the records from > `folderTopicName`. You mentioned that it happens only when you start > processing and the other table snapshot works fine. This gives me a feeling > that the first records in the topic might not be deserialized properly. > > Regards, > Mangat > > On Tue, Mar 26, 2024 at 8:45 AM Karsten Stöckmann < > karsten.stoeckm...@gmail.com> wrote: > > > Hi, > > > > thanks for getting back. I'll try and illustrate the issue. > > > > I've got an input topic 'folderTopicName' fed by a database CDC system. > > Messages then pass a series of FK left joins and are eventually sent to > an > > output topic like this ('agencies' and 'documents' being KTables): > > > > > > streamsBuilder // > > .table( // > > folderTopicName, // > > Consumed.with( // > > folderKeySerde, // > > folderSerde)) // > > .leftJoin( // > > agencies, // > > Folder::agencyIdValue, // > > AggregateFolder::new, // > > TableJoined.as("folder-to-agency"), // > > Materializer // > > . > AggregateFolder>named("folder-to-agency-materialized") // > > .withKeySerde(folderKeySerde) // > > > .withValueSerde(aggregateFolderSerde)) > > // > > .leftJoin( // > > documents, // > > .toStream(... > > .to(... > > > > ... > > > > As far as I understand, left join sematics should be similar to those of > > relational databases, i.e. the left hand value always passes the join > with > > the right hand value set as if not present. Whereas what I am > > observing is this: lots of messages on the input topic are even absent on > > the first left join changelog topic > > ('folder-to-agency-materialized-changelog'). But: this seems to happen > only > > in case the Streams application is fired up for the first time, i.e. > > intermediate topics do not yet exist. When streaming another table > snapshot > > to the input topic, things seem (!) to work as expected... > > > > Best wishes, > > Karsten > > > > Bruno Cadonna schrieb am Mo., 25. März 2024, 17:01: > > > > > Hi, > > > > > > That sounds worrisome! > > > > > > Could you please provide us with a minimal example that shows the issue > > > you describe? > > > > > > That would help a lot! > > > > > > Best, > > > Bruno > > > > > > On 3/25/24 4:07 PM, Karsten Stöckmann wrote: > > > > Hi, > > > > > > > > are there circumstances that might lead to messages silently (i.e. > > > without > > > > any logged warnings or errors) disappearing from a topology? > > > > > > > > Specifically, I've got a rather simple topology doing a series of FK > > left > > > > joins and notice severe message loss in case the application is fired > > up > > > > for the first time, i.e. intermediate topics not existing yet. I'd > > > > generally expect the message count on the output topic resemble that > > from > > > > the input topic, yet it doesn't (about half only). > > > > > > > > Any hints on this? > > > > > > > > Best wishes > > > > Karsten > > > > > > > > > >
Re: Messages disappearing from Kafka Streams topology
Hey Karsten, There could be several reasons this could happen. 1. Did you check the error logs? There are several reasons why the Kafka stream app may drop incoming messages. Use exactly-once semantics to limit such cases. 2. Are you sure there was no error when deserializing the records from `folderTopicName`. You mentioned that it happens only when you start processing and the other table snapshot works fine. This gives me a feeling that the first records in the topic might not be deserialized properly. Regards, Mangat On Tue, Mar 26, 2024 at 8:45 AM Karsten Stöckmann < karsten.stoeckm...@gmail.com> wrote: > Hi, > > thanks for getting back. I'll try and illustrate the issue. > > I've got an input topic 'folderTopicName' fed by a database CDC system. > Messages then pass a series of FK left joins and are eventually sent to an > output topic like this ('agencies' and 'documents' being KTables): > > > streamsBuilder // > .table( // > folderTopicName, // > Consumed.with( // > folderKeySerde, // > folderSerde)) // > .leftJoin( // > agencies, // > Folder::agencyIdValue, // > AggregateFolder::new, // > TableJoined.as("folder-to-agency"), // > Materializer // > . AggregateFolder>named("folder-to-agency-materialized") // > .withKeySerde(folderKeySerde) // > .withValueSerde(aggregateFolderSerde)) > // > .leftJoin( // > documents, // > .toStream(... > .to(... > > ... > > As far as I understand, left join sematics should be similar to those of > relational databases, i.e. the left hand value always passes the join with > the right hand value set as if not present. Whereas what I am > observing is this: lots of messages on the input topic are even absent on > the first left join changelog topic > ('folder-to-agency-materialized-changelog'). But: this seems to happen only > in case the Streams application is fired up for the first time, i.e. > intermediate topics do not yet exist. When streaming another table snapshot > to the input topic, things seem (!) to work as expected... > > Best wishes, > Karsten > > Bruno Cadonna schrieb am Mo., 25. März 2024, 17:01: > > > Hi, > > > > That sounds worrisome! > > > > Could you please provide us with a minimal example that shows the issue > > you describe? > > > > That would help a lot! > > > > Best, > > Bruno > > > > On 3/25/24 4:07 PM, Karsten Stöckmann wrote: > > > Hi, > > > > > > are there circumstances that might lead to messages silently (i.e. > > without > > > any logged warnings or errors) disappearing from a topology? > > > > > > Specifically, I've got a rather simple topology doing a series of FK > left > > > joins and notice severe message loss in case the application is fired > up > > > for the first time, i.e. intermediate topics not existing yet. I'd > > > generally expect the message count on the output topic resemble that > from > > > the input topic, yet it doesn't (about half only). > > > > > > Any hints on this? > > > > > > Best wishes > > > Karsten > > > > > >
Re: Messages disappearing from Kafka Streams topology
Hi, thanks for getting back. I'll try and illustrate the issue. I've got an input topic 'folderTopicName' fed by a database CDC system. Messages then pass a series of FK left joins and are eventually sent to an output topic like this ('agencies' and 'documents' being KTables): streamsBuilder // .table( // folderTopicName, // Consumed.with( // folderKeySerde, // folderSerde)) // .leftJoin( // agencies, // Folder::agencyIdValue, // AggregateFolder::new, // TableJoined.as("folder-to-agency"), // Materializer // .named("folder-to-agency-materialized") // .withKeySerde(folderKeySerde) // .withValueSerde(aggregateFolderSerde)) // .leftJoin( // documents, // .toStream(... .to(... ... As far as I understand, left join sematics should be similar to those of relational databases, i.e. the left hand value always passes the join with the right hand value set as if not present. Whereas what I am observing is this: lots of messages on the input topic are even absent on the first left join changelog topic ('folder-to-agency-materialized-changelog'). But: this seems to happen only in case the Streams application is fired up for the first time, i.e. intermediate topics do not yet exist. When streaming another table snapshot to the input topic, things seem (!) to work as expected... Best wishes, Karsten Bruno Cadonna schrieb am Mo., 25. März 2024, 17:01: > Hi, > > That sounds worrisome! > > Could you please provide us with a minimal example that shows the issue > you describe? > > That would help a lot! > > Best, > Bruno > > On 3/25/24 4:07 PM, Karsten Stöckmann wrote: > > Hi, > > > > are there circumstances that might lead to messages silently (i.e. > without > > any logged warnings or errors) disappearing from a topology? > > > > Specifically, I've got a rather simple topology doing a series of FK left > > joins and notice severe message loss in case the application is fired up > > for the first time, i.e. intermediate topics not existing yet. I'd > > generally expect the message count on the output topic resemble that from > > the input topic, yet it doesn't (about half only). > > > > Any hints on this? > > > > Best wishes > > Karsten > > >
Re: Messages disappearing from Kafka Streams topology
Hi, That sounds worrisome! Could you please provide us with a minimal example that shows the issue you describe? That would help a lot! Best, Bruno On 3/25/24 4:07 PM, Karsten Stöckmann wrote: Hi, are there circumstances that might lead to messages silently (i.e. without any logged warnings or errors) disappearing from a topology? Specifically, I've got a rather simple topology doing a series of FK left joins and notice severe message loss in case the application is fired up for the first time, i.e. intermediate topics not existing yet. I'd generally expect the message count on the output topic resemble that from the input topic, yet it doesn't (about half only). Any hints on this? Best wishes Karsten
Messages disappearing from Kafka Streams topology
Hi, are there circumstances that might lead to messages silently (i.e. without any logged warnings or errors) disappearing from a topology? Specifically, I've got a rather simple topology doing a series of FK left joins and notice severe message loss in case the application is fired up for the first time, i.e. intermediate topics not existing yet. I'd generally expect the message count on the output topic resemble that from the input topic, yet it doesn't (about half only). Any hints on this? Best wishes Karsten