Hello! Sure thing! I'm reading them *separately*, both are apps written with Scala + Spark Structured Streaming.
I feel like I missed some details on my original thread (sorry it was past 4 AM) and it was getting frustrating Please let me try to clarify some points: *Transactions Created Consumer* ----------------------------------- | Kafka trx-created-topic | <--- (Scala + SparkStructured Streaming) ConsumerApp ---> Sinks to ---> Postgres DB Table (Transactions) ----------------------------------- *Transactions Processed Consumer* ------------------------------------- | Kafka trx-processed-topic | <--- 1) (Scala + SparkStructured Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a") ------------------------------------- 2) Selects the Ids ------------------------------------- | Postgres / Trx table |. <--- 3) Fetches the rows w/ the matching ids that have status 'created (let's call it "b") ------------------------------------- 4) Performs an intersection between "a" and "b" resulting in a "b_that_needs_sinking" (but now there's some "b_leftovers" that were out of the intersection) 5) Sinks "b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as unprocessed (not persisted) 6) However, those "b_leftovers" would, ultimately, be processed at some point (even if it takes like 1-3 days) - when their corresponding transaction_id are pushed to the "trx-created-topic" Kafka topic, and are then processed by that first consumer So, what I'm trying to accomplish is find a way to reprocess those "b_leftovers" *without *having to restart the app Does that make sense? PS: It doesn't necessarily have to be real streaming, if micro-batching (legacy Spark Streaming) would allow such a thing, it would technically work (although I keep hearing it's not advisable) Thank you so much! Kind regards On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Can you please clarify if you are reading these two topics separately or > within the same scala or python script in Spark Structured Streaming? > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira <bruno.ar...@gmail.com> wrote: > >> Hello guys, >> >> I've been struggling with this for some days now, without success, so I >> would highly appreciate any enlightenment. The simplified scenario is the >> following: >> >> - I've got 2 topics in Kafka (it's already like that in production, >> can't change it) >> - transactions-created, >> - transaction-processed >> - Even though the schema is not exactly the same, they all share a >> correlation_id, which is their "transaction_id" >> >> So, long story short, I've got 2 consumers, one for each topic, and all I >> wanna do is sink them in a chain order. I'm writing them w/ Spark >> Structured Streaming, btw >> >> So far so good, the caveat here is: >> >> - I cannot write a given "*processed" *transaction unless there is an >> entry of that same transaction with the status "*created*". >> >> - There is *no* guarantee that any transactions in the topic >> "transaction-*processed*" have a match (same transaction_id) in the >> "transaction-*created*" at the moment the messages are fetched. >> >> So the workflow so far is: >> - Msgs from the "transaction-created" just get synced to postgres, no >> questions asked >> >> - As for the "transaction-processed", it goes as follows: >> >> - a) Messages are fetched from the Kafka topic >> - b) Select the transaction_id of those... >> - c) Fetch all the rows w/ the corresponding id from a Postgres table >> AND that have the status "CREATED" >> - d) Then, a pretty much do a intersection between the two datasets, >> and sink only on "processed" ones that have with step c >> - e) Persist the resulting dataset >> >> But the rows (from the 'processed') that were not part of the >> intersection get lost afterwards... >> >> So my question is: >> - Is there ANY way to reprocess/replay them at all WITHOUT restarting the >> app? >> - For this scenario, should I fall back to Spark Streaming, instead of >> Structured Streaming? >> >> PS: I was playing around with Spark Streaming (legacy) and managed to >> commit only the ones in the microbatches that were fully successful (still >> failed to find a way to "poll" for the uncommitted ones without restarting, >> though). >> >> Thank you very much in advance! >> >>