Hi Option 1: You can write back to processed queue and add some additional info like last time tried and some counter.
Option 2: Load unpaired transactiona in a staging table in postgres. Modify streaming job of "created" flow to try to pair any unpaired trx and clear it by moving to main table. You may need to create a batch job to delete records from the staging table to remove unpaired records which pass a certain age. Ayan On Sat, 10 Jul 2021 at 9:49 am, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > One alternative I can think of is that you publish your orphaned > transactions to another topic from the main Spark job > > You create a new DF based on orphaned transactions > > result = orphanedDF \ > ...... > .writeStream \ > .outputMode('complete') \ > .format("kafka") \ > .option("kafka.bootstrap.servers", > config['MDVariables']['bootstrapServers'],) \ > .option("topic", "orphaned") \ > .option('checkpointLocation', checkpoint_path) \ > .queryName("orphanedTransactions") \ > .start() > > > And consume it somewhere else > > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 10 Jul 2021 at 00:36, Bruno Oliveira <bruno.ar...@gmail.com> > wrote: > >> I mean... I guess? >> >> But I don't really have Airflow here, and I didn't really wanted to fall >> back to a "batch"-kinda approach with Airflow >> >> I'd rather use a Dead Letter Queue approach instead (like I mentioned >> another topic for the failed ones, which is later consumed and pumps >> the messages back to the original topic), >> or something with Spark+Delta Lake instead... >> >> I was just hoping I could somewhat just retry/replay these "orphaned" >> transactions somewhat easier... >> >> *Question) *Those features of "Stateful Streaming" or "Continuous >> Processing" mode wouldn't help solve my case, would they? >> >> On Fri, Jul 9, 2021 at 8:19 PM Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> Well this is a matter of using journal entries. >>> >>> What you can do is that those "orphaned" transactions that you cannot >>> pair through transaction_id can be written to a journal table in your >>> Postgres DB. Then you can pair them with the entries in the relevant >>> Postgres table. If the essence is not time critical this can be done >>> through a scheduling job every x minutes through airflow or something >>> similar on the database alone. >>> >>> HTH >>> >>> >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Fri, 9 Jul 2021 at 23:53, Bruno Oliveira <bruno.ar...@gmail.com> >>> wrote: >>> >>>> That is exactly the case, Sebastian! >>>> >>>> - In practise, that "created means "*authorized*", but I still cannot >>>> deduct anything from the customer balance >>>> - the "processed" means I can safely deduct the transaction_amount >>>> from the customer balance, >>>> - and the "refunded" means I must give the transaction amount back to >>>> the customer balance >>>> >>>> So, technically, we cannot process something that is not "AUTHORIZED" >>>> (created) yet, nor can we process a refund for a transaction that has NOT >>>> been PROCESSED yet. >>>> >>>> >>>> *You have an authorisation, then the actual transaction and maybe a >>>>> refund some time in the future. You want to proceed with a transaction >>>>> only >>>>> if you've seen the auth but in an eventually consistent system this might >>>>> not always happen.* >>>> >>>> >>>> That's absolutely the case! So, yes, That's correct. >>>> >>>> *You are asking in the case of receiving the transaction before the >>>>> auth how to retry later? * >>>> >>>> >>>> Yeah! I'm struggling for days on how to solve with Spark Structured >>>> Streaming... >>>> >>>> *Right now you are discarding those transactions that didn't match so >>>>> you instead would need to persist them somewhere and either reinject them >>>>> into the job that does lookup (say after x minutes) * >>>> >>>> >>>> >>>> *Right now, the best I could think of is: * >>>> >>>> - Say, I'm reading the messages w/ transaction_id [1, 2, 3] from >>>> Kafka (topic "transactions-processed") >>>> - Then I'm querying the database for these IDs that have the status >>>> "CREATED" (or "AUTHORIZED" to be more accurate), and it returns the >>>> transactions for IDs [1, 2] >>>> - So, while it'll work for the ones with ID [1. 2] , I would have >>>> to put that transaction_id 3 in another topic, say, " >>>> *transaction-processed-retry*" >>>> - And write yet another consumer, to fetch the messages from that >>>> "*transaction-processed-retry" >>>> *and put them back to the original topic (transactions-processed) >>>> - And do something similar for the transactions-refunded >>>> >>>> *Q1) *I think this approach may work, but I can't stop thinking I'm >>>> overengineering this, and was wondering if there isn't a better approach... >>>> ? >>>> >>>> *Is this what you are looking for?* >>>> >>>> >>>> Yes, that's exactly it. >>>> >>>> >>>> *Q2)* I know that, under the hood, Structured Streaming is actually >>>> using the micro-batch engine, >>>> if I switched to *Continuous Processing*, would it make any >>>> difference? Would it allow me any "retry" mechanism out of the box? >>>> >>>> *Q3)* I stumbled upon a *Stateful Streaming* ( >>>> https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming) >>>> , but I have never ever used it before, >>>> would that actually do something for my case >>>> (retrying/replaying a given message) ? >>>> >>>> >>>> Thank you very VERY in advance! >>>> Best regards >>>> >>>> >>>> On Fri, Jul 9, 2021 at 6:36 PM Sebastian Piu <sebastian....@gmail.com> >>>> wrote: >>>> >>>>> So in payment systems you have something similar I think >>>>> >>>>> You have an authorisation, then the actual transaction and maybe a >>>>> refund some time in the future. You want to proceed with a transaction >>>>> only >>>>> if you've seen the auth but in an eventually consistent system this might >>>>> not always happen. >>>>> >>>>> You are asking in the case of receiving the transaction before the >>>>> auth how to retry later? >>>>> >>>>> Right now you are discarding those transactions that didn't match so >>>>> you instead would need to persist them somewhere and either reinject them >>>>> into the job that does lookup (say after x minutes) >>>>> >>>>> Is this what you are looking for? >>>>> >>>>> On Fri, 9 Jul 2021, 9:44 pm Bruno Oliveira, <bruno.ar...@gmail.com> >>>>> wrote: >>>>> >>>>>> I'm terribly sorry, Mich. That was my mistake. >>>>>> The timestamps are not the same (I copy&pasted without realizing >>>>>> that, I'm really sorry for the confusion) >>>>>> >>>>>> Please assume NONE of the following transactions are in the database >>>>>> yet >>>>>> >>>>>> *transactions-created:* >>>>>> { "transaction_id": 1, "amount": 1000, "timestamp": "2020-04-04 >>>>>> 11:01:00" } >>>>>> { "transaction_id": 2, "amount": 2000, "timestamp": "2020-04-04 >>>>>> 08:02:00" } >>>>>> >>>>>> *transactions-processed: * >>>>>> { "transaction_id": 1, "timestamp": "2020-04-04 11:03:00" } // so >>>>>> it's processed 2 minutes after it was created >>>>>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" } // so >>>>>> it's processed 4 hours after it was created >>>>>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" } // >>>>>> cannot be persisted into the DB yet, because this "transaction_id 3" with >>>>>> the status "CREATED" does NOT exist in the DB >>>>>> >>>>>> >>>>>> *(...) Transactions-created are created at the same time (the same >>>>>>> timestamp) but you have NOT received them and they don't yet exist in >>>>>>> your >>>>>>> DB (...)* >>>>>> >>>>>> - Not at the same timestamp, that was my mistake. >>>>>> - Imagine two transactions with the same ID (neither of them are in >>>>>> any Kafka topic yet), >>>>>> >>>>>> - One with the status CREATED, and another with the status >>>>>> PROCESSED, >>>>>> - The one with the status PROCESSED will ALWAYS have a >>>>>> higher/greater timestamp than the one with the status CREATED >>>>>> - Now for whatever reason, this happens: >>>>>> - Step a) some producer *fails* to push the *created* one to >>>>>> the topic *transactions-created, it will RETRY, and will >>>>>> eventually succeed, but that can take minutes, or hours* >>>>>> - Step b) however, the producer *succeeds* in pushing the* >>>>>> 'processed' *one to the topic *transactions-processed * >>>>>> >>>>>> >>>>>> *(...) because presumably your relational database is too slow to >>>>>>> ingest them? (...)* >>>>>> >>>>>> >>>>>> - it's not like the DB was slow, it was because the message for >>>>>> transaction_id 3 didn't arrive at the *topic-created *yet, due to >>>>>> some error/failure in Step A, for example >>>>>> >>>>>> >>>>>> * you do a query in Postgres for say transaction_id 3 but they don't >>>>>>> exist yet? When are they expected to arrive?* >>>>>> >>>>>> >>>>>> - That's correct. It could take minutes, maybe hours. But it is >>>>>> guaranteed that at some point, in the future, they will arrive. I just >>>>>> have >>>>>> to keep trying until it works, this transaction_id 3 with the status >>>>>> CREATED arrives at the database >>>>>> >>>>>> >>>>>> Huge apologies for the confusion... Is it a bit more clear now? >>>>>> >>>>>> *PS:* This is a simplified scenario, in practise, there is yet >>>>>> another topic for "transactions-refunded". But which cannot be sinked to >>>>>> the DB, unless the same transaction_id with the status "PROCESSED" is >>>>>> there. (but again, there can only be a transaction_id PROCESSED, if the >>>>>> same transaction_id with CREATED exists in the DB) >>>>>> >>>>>> >>>>>> On Fri, Jul 9, 2021 at 4:51 PM Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> One second >>>>>>> >>>>>>> The topic called transactions_processed is streaming through Spark. >>>>>>> Transactions-created are created at the same time (the same timestamp) >>>>>>> but >>>>>>> you have NOT received them and they don't yet exist in your DB, >>>>>>> because presumably your relational database is too slow to ingest them? >>>>>>> you >>>>>>> do a query in Postgres for say transaction_id 3 but they don't exist >>>>>>> yet? >>>>>>> When are they expected to arrive? >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> view my Linkedin profile >>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, 9 Jul 2021 at 19:12, Bruno Oliveira <bruno.ar...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks for the quick reply! >>>>>>>> >>>>>>>> I'm not sure I got the idea correctly... but from what I'm >>>>>>>> underding, wouldn't that actually end the same way? >>>>>>>> Because, this is the current scenario: >>>>>>>> >>>>>>>> *transactions-processed: * >>>>>>>> { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" } >>>>>>>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" } >>>>>>>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" } >>>>>>>> { "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" } >>>>>>>> >>>>>>>> *transactions-created:* >>>>>>>> { "transaction_id": 1, "amount": 1000, "timestamp": "2020-04-04 >>>>>>>> 11:01:00" } >>>>>>>> { "transaction_id": 2, "amount": 2000, "timestamp": "2020-04-04 >>>>>>>> 12:02:00" } >>>>>>>> >>>>>>>> - So, when I fetch ALL messages from both topics, there are still >>>>>>>> 2x transactions (id: "*3*" and "*4*") which do *not* exist in the >>>>>>>> topic "transaction-created" yet (and they aren't in Postgres either) >>>>>>>> - But since they were pulled by "Structured Streaming" already, >>>>>>>> they'll be kinda marked as "processed" by Spark Structure Streaming >>>>>>>> checkpoint anyway. >>>>>>>> >>>>>>>> And therefore, I can't replay/reprocess them again... >>>>>>>> >>>>>>>> Is my understanding correct? Am I missing something here? >>>>>>>> >>>>>>>> On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh < >>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thanks for the details. >>>>>>>>> >>>>>>>>> Can you read these in the same app. For example. This is PySpark >>>>>>>>> but it serves the purpose. >>>>>>>>> >>>>>>>>> Read topic "newtopic" in micro batch and the other topic "md" in >>>>>>>>> another microbatch >>>>>>>>> >>>>>>>>> try: >>>>>>>>> # process topic --> newtopic >>>>>>>>> streamingNewtopic = self.spark \ >>>>>>>>> .readStream \ >>>>>>>>> .format("kafka") \ >>>>>>>>> .option("kafka.bootstrap.servers", >>>>>>>>> config['MDVariables']['bootstrapServers'],) \ >>>>>>>>> .option("schema.registry.url", >>>>>>>>> config['MDVariables']['schemaRegistryURL']) \ >>>>>>>>> .option("group.id", config['common']['newtopic']) >>>>>>>>> \ >>>>>>>>> .option("zookeeper.connection.timeout.ms", >>>>>>>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >>>>>>>>> .option("rebalance.backoff.ms", >>>>>>>>> config['MDVariables']['rebalanceBackoffMS']) \ >>>>>>>>> .option("zookeeper.session.timeout.ms", >>>>>>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \ >>>>>>>>> .option("auto.commit.interval.ms", >>>>>>>>> config['MDVariables']['autoCommitIntervalMS']) \ >>>>>>>>> *.option("subscribe", >>>>>>>>> config['MDVariables']['newtopic']) \* >>>>>>>>> .option("failOnDataLoss", "false") \ >>>>>>>>> .option("includeHeaders", "true") \ >>>>>>>>> .option("startingOffsets", "latest") \ >>>>>>>>> .load() \ >>>>>>>>> .select(from_json(col("value").cast("string"), >>>>>>>>> newtopicSchema).alias("newtopic_value")) >>>>>>>>> >>>>>>>>> # construct a streaming dataframe streamingDataFrame >>>>>>>>> that subscribes to topic config['MDVariables']['topic']) -> md >>>>>>>>> (market data) >>>>>>>>> streamingDataFrame = self.spark \ >>>>>>>>> .readStream \ >>>>>>>>> .format("kafka") \ >>>>>>>>> .option("kafka.bootstrap.servers", >>>>>>>>> config['MDVariables']['bootstrapServers'],) \ >>>>>>>>> .option("schema.registry.url", >>>>>>>>> config['MDVariables']['schemaRegistryURL']) \ >>>>>>>>> .option("group.id", config['common']['appName']) \ >>>>>>>>> .option("zookeeper.connection.timeout.ms", >>>>>>>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >>>>>>>>> .option("rebalance.backoff.ms", >>>>>>>>> config['MDVariables']['rebalanceBackoffMS']) \ >>>>>>>>> .option("zookeeper.session.timeout.ms", >>>>>>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \ >>>>>>>>> .option("auto.commit.interval.ms", >>>>>>>>> config['MDVariables']['autoCommitIntervalMS']) \ >>>>>>>>> *.option("subscribe", >>>>>>>>> config['MDVariables']['topic']) \* >>>>>>>>> .option("failOnDataLoss", "false") \ >>>>>>>>> .option("includeHeaders", "true") \ >>>>>>>>> .option("startingOffsets", "latest") \ >>>>>>>>> .load() \ >>>>>>>>> .select(from_json(col("value").cast("string"), >>>>>>>>> schema).alias("parsed_value")) >>>>>>>>> >>>>>>>>> >>>>>>>>> streamingNewtopic.printSchema() >>>>>>>>> >>>>>>>>> # Now do a writeStream and call the relevant >>>>>>>>> functions to process dataframes >>>>>>>>> >>>>>>>>> newtopicResult = streamingNewtopic.select( \ >>>>>>>>> col("newtopic_value.uuid").alias("uuid") \ >>>>>>>>> , >>>>>>>>> col("newtopic_value.timeissued").alias("timeissued") \ >>>>>>>>> , col("newtopic_value.queue").alias("queue") \ >>>>>>>>> , >>>>>>>>> col("newtopic_value.status").alias("status")). \ >>>>>>>>> writeStream. \ >>>>>>>>> outputMode('append'). \ >>>>>>>>> option("truncate", "false"). \ >>>>>>>>> * foreachBatch(sendToControl). \* >>>>>>>>> trigger(processingTime='2 seconds'). \ >>>>>>>>> queryName(config['MDVariables']['newtopic']). >>>>>>>>> \ >>>>>>>>> start() >>>>>>>>> >>>>>>>>> result = streamingDataFrame.select( \ >>>>>>>>> col("parsed_value.rowkey").alias("rowkey") \ >>>>>>>>> , col("parsed_value.ticker").alias("ticker") \ >>>>>>>>> , >>>>>>>>> col("parsed_value.timeissued").alias("timeissued") \ >>>>>>>>> , col("parsed_value.price").alias("price")). \ >>>>>>>>> writeStream. \ >>>>>>>>> outputMode('append'). \ >>>>>>>>> option("truncate", "false"). \ >>>>>>>>> *foreachBatch(sendToSink). \* >>>>>>>>> trigger(processingTime='30 seconds'). \ >>>>>>>>> option('checkpointLocation', >>>>>>>>> checkpoint_path). \ >>>>>>>>> queryName(config['MDVariables']['topic']). \ >>>>>>>>> start() >>>>>>>>> print(result) >>>>>>>>> >>>>>>>>> except Exception as e: >>>>>>>>> print(f"""{e}, quitting""") >>>>>>>>> sys.exit(1) >>>>>>>>> >>>>>>>>> Inside that function say *sendToSink *you can get the df and >>>>>>>>> batchId >>>>>>>>> >>>>>>>>> def sendToSink(df, batchId): >>>>>>>>> if(len(df.take(1))) > 0: >>>>>>>>> print(f"""md batchId is {batchId}""") >>>>>>>>> df.show(100,False) >>>>>>>>> df. persist() >>>>>>>>> # write to BigQuery batch table >>>>>>>>> s.writeTableToBQ(df, "append", >>>>>>>>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) >>>>>>>>> df.unpersist() >>>>>>>>> print(f"""wrote to DB""") >>>>>>>>> else: >>>>>>>>> print("DataFrame md is empty") >>>>>>>>> >>>>>>>>> And you have created DF from the other topic newtopic >>>>>>>>> >>>>>>>>> def sendToControl(dfnewtopic, batchId): >>>>>>>>> if(len(dfnewtopic.take(1))) > 0: >>>>>>>>> ...... >>>>>>>>> >>>>>>>>> Now you have two dataframe* df* and *dfnewtopic* in the same >>>>>>>>> session. Will you be able to join these two dataframes through common >>>>>>>>> key >>>>>>>>> value? >>>>>>>>> >>>>>>>>> HTH >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> view my Linkedin profile >>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>>>> for any loss, damage or destruction of data or any other property >>>>>>>>> which may >>>>>>>>> arise from relying on this email's technical content is explicitly >>>>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>>>> damages >>>>>>>>> arising from such loss, damage or destruction. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, 9 Jul 2021 at 17:41, Bruno Oliveira <bruno.ar...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello! Sure thing! >>>>>>>>>> >>>>>>>>>> I'm reading them *separately*, both are apps written with Scala >>>>>>>>>> + Spark Structured Streaming. >>>>>>>>>> >>>>>>>>>> I feel like I missed some details on my original thread (sorry it >>>>>>>>>> was past 4 AM) and it was getting frustrating >>>>>>>>>> Please let me try to clarify some points: >>>>>>>>>> >>>>>>>>>> *Transactions Created Consumer* >>>>>>>>>> ----------------------------------- >>>>>>>>>> | Kafka trx-created-topic | <--- (Scala + SparkStructured >>>>>>>>>> Streaming) ConsumerApp ---> Sinks to ---> Postgres DB Table >>>>>>>>>> (Transactions) >>>>>>>>>> ----------------------------------- >>>>>>>>>> >>>>>>>>>> *Transactions Processed Consumer* >>>>>>>>>> ------------------------------------- >>>>>>>>>> | Kafka trx-processed-topic | <--- 1) (Scala + SparkStructured >>>>>>>>>> Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a") >>>>>>>>>> ------------------------------------- 2) Selects the >>>>>>>>>> Ids >>>>>>>>>> ------------------------------------- >>>>>>>>>> | Postgres / Trx table |. <--- 3) Fetches the rows w/ >>>>>>>>>> the matching ids that have status 'created (let's call it "b") >>>>>>>>>> ------------------------------------- 4) Performs an >>>>>>>>>> intersection between "a" and "b" resulting in a >>>>>>>>>> "b_that_needs_sinking" (but >>>>>>>>>> now there's some "b_leftovers" that were out of the intersection) >>>>>>>>>> 5) Sinks >>>>>>>>>> "b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as >>>>>>>>>> unprocessed (not persisted) >>>>>>>>>> 6) However, >>>>>>>>>> those "b_leftovers" would, ultimately, be processed at some point >>>>>>>>>> (even if >>>>>>>>>> it takes like 1-3 days) - when their corresponding transaction_id are >>>>>>>>>> pushed >>>>>>>>>> to the "trx-created-topic" Kafka topic, and are then processed by >>>>>>>>>> that >>>>>>>>>> first consumer >>>>>>>>>> >>>>>>>>>> So, what I'm trying to accomplish is find a way to reprocess >>>>>>>>>> those "b_leftovers" *without *having to restart the app >>>>>>>>>> Does that make sense? >>>>>>>>>> >>>>>>>>>> PS: It doesn't necessarily have to be real streaming, if >>>>>>>>>> micro-batching (legacy Spark Streaming) would allow such a thing, it >>>>>>>>>> would >>>>>>>>>> technically work (although I keep hearing it's not advisable) >>>>>>>>>> >>>>>>>>>> Thank you so much! >>>>>>>>>> >>>>>>>>>> Kind regards >>>>>>>>>> >>>>>>>>>> On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh < >>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Can you please clarify if you are reading these two topics >>>>>>>>>>> separately or within the same scala or python script in Spark >>>>>>>>>>> Structured >>>>>>>>>>> Streaming? >>>>>>>>>>> >>>>>>>>>>> HTH >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> view my Linkedin profile >>>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all >>>>>>>>>>> responsibility for any loss, damage or destruction of data or any >>>>>>>>>>> other >>>>>>>>>>> property which may arise from relying on this email's technical >>>>>>>>>>> content is >>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any >>>>>>>>>>> monetary damages arising from such loss, damage or destruction. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira < >>>>>>>>>>> bruno.ar...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello guys, >>>>>>>>>>>> >>>>>>>>>>>> I've been struggling with this for some days now, without >>>>>>>>>>>> success, so I would highly appreciate any enlightenment. The >>>>>>>>>>>> simplified >>>>>>>>>>>> scenario is the following: >>>>>>>>>>>> >>>>>>>>>>>> - I've got 2 topics in Kafka (it's already like that in >>>>>>>>>>>> production, can't change it) >>>>>>>>>>>> - transactions-created, >>>>>>>>>>>> - transaction-processed >>>>>>>>>>>> - Even though the schema is not exactly the same, they all >>>>>>>>>>>> share a correlation_id, which is their "transaction_id" >>>>>>>>>>>> >>>>>>>>>>>> So, long story short, I've got 2 consumers, one for each topic, >>>>>>>>>>>> and all I wanna do is sink them in a chain order. I'm writing them >>>>>>>>>>>> w/ Spark >>>>>>>>>>>> Structured Streaming, btw >>>>>>>>>>>> >>>>>>>>>>>> So far so good, the caveat here is: >>>>>>>>>>>> >>>>>>>>>>>> - I cannot write a given "*processed" *transaction unless >>>>>>>>>>>> there is an entry of that same transaction with the status " >>>>>>>>>>>> *created*". >>>>>>>>>>>> >>>>>>>>>>>> - There is *no* guarantee that any transactions in the topic >>>>>>>>>>>> "transaction-*processed*" have a match (same transaction_id) >>>>>>>>>>>> in the "transaction-*created*" at the moment the messages are >>>>>>>>>>>> fetched. >>>>>>>>>>>> >>>>>>>>>>>> So the workflow so far is: >>>>>>>>>>>> - Msgs from the "transaction-created" just get synced to >>>>>>>>>>>> postgres, no questions asked >>>>>>>>>>>> >>>>>>>>>>>> - As for the "transaction-processed", it goes as follows: >>>>>>>>>>>> >>>>>>>>>>>> - a) Messages are fetched from the Kafka topic >>>>>>>>>>>> - b) Select the transaction_id of those... >>>>>>>>>>>> - c) Fetch all the rows w/ the corresponding id from a >>>>>>>>>>>> Postgres table AND that have the status "CREATED" >>>>>>>>>>>> - d) Then, a pretty much do a intersection between the two >>>>>>>>>>>> datasets, and sink only on "processed" ones that have with step >>>>>>>>>>>> c >>>>>>>>>>>> - e) Persist the resulting dataset >>>>>>>>>>>> >>>>>>>>>>>> But the rows (from the 'processed') that were not part of the >>>>>>>>>>>> intersection get lost afterwards... >>>>>>>>>>>> >>>>>>>>>>>> So my question is: >>>>>>>>>>>> - Is there ANY way to reprocess/replay them at all WITHOUT >>>>>>>>>>>> restarting the app? >>>>>>>>>>>> - For this scenario, should I fall back to Spark Streaming, >>>>>>>>>>>> instead of Structured Streaming? >>>>>>>>>>>> >>>>>>>>>>>> PS: I was playing around with Spark Streaming (legacy) and >>>>>>>>>>>> managed to commit only the ones in the microbatches that were fully >>>>>>>>>>>> successful (still failed to find a way to "poll" for the >>>>>>>>>>>> uncommitted ones >>>>>>>>>>>> without restarting, though). >>>>>>>>>>>> >>>>>>>>>>>> Thank you very much in advance! >>>>>>>>>>>> >>>>>>>>>>>> -- Best Regards, Ayan Guha