Hi Mangat,

thanks for clarification. So to my knowledge exactly-once is configured
using the 'processing.guarantee=exactly_once_v2' setting? Is the
configuration setting 'acks=all' somehow related and would you advise
setting that as well?

Best wishes
Karsten


mangat rai <mangatm...@gmail.com> schrieb am Di., 26. März 2024, 15:44:

> Hey Karsten,
>
> So if a topic has not been created yet. Streams app will keep the data in
> memory, and then write it later when it is available. if your app is
> restarted (or thread is killed), you may lose data but it depends if the
> app will commit in the source topics. If there is no errors, then it should
> be persisted eventually.
>
> However, overall exactly-once provides a much tighter and better commit
> control. If you don't have scaling issue, I will strongly advise you to use
> EOS.
>
> Thanks,
> Mangat
>
>
> On Tue, Mar 26, 2024 at 3:33 PM Karsten Stöckmann <
> karsten.stoeckm...@gmail.com> wrote:
>
> > Hi Mangat,
> >
> > thanks for your thoughts. I had actually considered exactly-once
> semantics
> > already, was unsure whether it would help, and left it aside for once
> then.
> > I'll try that immediately when I get back to work.
> >
> > About snapshots and deserialization - I doubt that the issue is caused by
> > deserialization failures because: when taking another (i.e. at a later
> > point of time) snapshot of the exact same data, all messages fed into the
> > input topic pass the pipeline as expected.
> >
> > Logs of both Kafka and Kafka Streams show no signs of notable issues as
> far
> > as I can tell, apart from these (when initially starting up, intermediate
> > topics not existing yet):
> >
> > 2024-03-22 22:36:11,386 WARN [org.apa.kaf.cli.NetworkClient]
> >
> >
> (kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4)
> > [Consumer
> >
> >
> clientId=kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4-consumer,
> > groupId=kstreams-folder-aggregator] Error while fetching metadata with
> > correlation id 69 :
> >
> >
> {kstreams-folder-aggregator-folder-to-agency-subscription-response-topic=UNKNOWN_TOPIC_OR_PARTITION,
> > <various other intermediate topics>}
> >
> > Best wishes
> > Karsten
> >
> >
> >
> > mangat rai <mangatm...@gmail.com> schrieb am Di., 26. März 2024, 11:06:
> >
> > > Hey Karsten,
> > >
> > > There could be several reasons this could happen.
> > > 1. Did you check the error logs? There are several reasons why the
> Kafka
> > > stream app may drop incoming messages. Use exactly-once semantics to
> > limit
> > > such cases.
> > > 2. Are you sure there was no error when deserializing the records from
> > > `folderTopicName`. You mentioned that it happens only when you start
> > > processing and the other table snapshot works fine. This gives me a
> > feeling
> > > that the first records in the topic might not be deserialized properly.
> > >
> > > Regards,
> > > Mangat
> > >
> > > On Tue, Mar 26, 2024 at 8:45 AM Karsten Stöckmann <
> > > karsten.stoeckm...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > thanks for getting back. I'll try and illustrate the issue.
> > > >
> > > > I've got an input topic 'folderTopicName' fed by a database CDC
> system.
> > > > Messages then pass a series of FK left joins and are eventually sent
> to
> > > an
> > > > output topic like this ('agencies' and 'documents' being KTables):
> > > >
> > > >
> > > >             streamsBuilder //
> > > >                     .table( //
> > > >                             folderTopicName, //
> > > >                             Consumed.with( //
> > > >                                     folderKeySerde, //
> > > >                                     folderSerde)) //
> > > >                     .leftJoin( //
> > > >                             agencies, //
> > > >                             Folder::agencyIdValue, //
> > > >                             AggregateFolder::new, //
> > > >                             TableJoined.as("folder-to-agency"), //
> > > >                             Materializer //
> > > >                                     .<FolderId,
> > > > AggregateFolder>named("folder-to-agency-materialized") //
> > > >                                     .withKeySerde(folderKeySerde) //
> > > >
> > >  .withValueSerde(aggregateFolderSerde))
> > > > //
> > > >                     .leftJoin( //
> > > >                             documents, //
> > > >                     .toStream(...
> > > >                     .to(...
> > > >
> > > > ...
> > > >
> > > > As far as I understand, left join sematics should be similar to those
> > of
> > > > relational databases, i.e. the left hand value always passes the join
> > > with
> > > > the right hand value set as <null> if not present. Whereas what I am
> > > > observing is this: lots of messages on the input topic are even
> absent
> > on
> > > > the first left join changelog topic
> > > > ('folder-to-agency-materialized-changelog'). But: this seems to
> happen
> > > only
> > > > in case the Streams application is fired up for the first time, i.e.
> > > > intermediate topics do not yet exist. When streaming another table
> > > snapshot
> > > > to the input topic, things seem (!) to work as expected...
> > > >
> > > > Best wishes,
> > > > Karsten
> > > >
> > > > Bruno Cadonna <cado...@apache.org> schrieb am Mo., 25. März 2024,
> > 17:01:
> > > >
> > > > > Hi,
> > > > >
> > > > > That sounds worrisome!
> > > > >
> > > > > Could you please provide us with a minimal example that shows the
> > issue
> > > > > you describe?
> > > > >
> > > > > That would help a lot!
> > > > >
> > > > > Best,
> > > > > Bruno
> > > > >
> > > > > On 3/25/24 4:07 PM, Karsten Stöckmann wrote:
> > > > > > Hi,
> > > > > >
> > > > > > are there circumstances that might lead to messages silently
> (i.e.
> > > > > without
> > > > > > any logged warnings or errors) disappearing from a topology?
> > > > > >
> > > > > > Specifically, I've got a rather simple topology doing a series of
> > FK
> > > > left
> > > > > > joins and notice severe message loss in case the application is
> > fired
> > > > up
> > > > > > for the first time, i.e. intermediate topics not existing yet.
> I'd
> > > > > > generally expect the message count on the output topic resemble
> > that
> > > > from
> > > > > > the input topic, yet it doesn't (about half only).
> > > > > >
> > > > > > Any hints on this?
> > > > > >
> > > > > > Best wishes
> > > > > > Karsten
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to