That is exactly the case, Sebastian! - In practise, that "created means "*authorized*", but I still cannot deduct anything from the customer balance - the "processed" means I can safely deduct the transaction_amount from the customer balance, - and the "refunded" means I must give the transaction amount back to the customer balance
So, technically, we cannot process something that is not "AUTHORIZED" (created) yet, nor can we process a refund for a transaction that has NOT been PROCESSED yet. *You have an authorisation, then the actual transaction and maybe a refund > some time in the future. You want to proceed with a transaction only if > you've seen the auth but in an eventually consistent system this might not > always happen.* That's absolutely the case! So, yes, That's correct. *You are asking in the case of receiving the transaction before the auth > how to retry later? * Yeah! I'm struggling for days on how to solve with Spark Structured Streaming... *Right now you are discarding those transactions that didn't match so you > instead would need to persist them somewhere and either reinject them into > the job that does lookup (say after x minutes) * *Right now, the best I could think of is: * - Say, I'm reading the messages w/ transaction_id [1, 2, 3] from Kafka (topic "transactions-processed") - Then I'm querying the database for these IDs that have the status "CREATED" (or "AUTHORIZED" to be more accurate), and it returns the transactions for IDs [1, 2] - So, while it'll work for the ones with ID [1. 2] , I would have to put that transaction_id 3 in another topic, say, " *transaction-processed-retry*" - And write yet another consumer, to fetch the messages from that "*transaction-processed-retry" *and put them back to the original topic (transactions-processed) - And do something similar for the transactions-refunded *Q1) *I think this approach may work, but I can't stop thinking I'm overengineering this, and was wondering if there isn't a better approach... ? *Is this what you are looking for?* Yes, that's exactly it. *Q2)* I know that, under the hood, Structured Streaming is actually using the micro-batch engine, if I switched to *Continuous Processing*, would it make any difference? Would it allow me any "retry" mechanism out of the box? *Q3)* I stumbled upon a *Stateful Streaming* ( https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming) , but I have never ever used it before, would that actually do something for my case (retrying/replaying a given message) ? Thank you very VERY in advance! Best regards On Fri, Jul 9, 2021 at 6:36 PM Sebastian Piu <sebastian....@gmail.com> wrote: > So in payment systems you have something similar I think > > You have an authorisation, then the actual transaction and maybe a refund > some time in the future. You want to proceed with a transaction only if > you've seen the auth but in an eventually consistent system this might not > always happen. > > You are asking in the case of receiving the transaction before the auth > how to retry later? > > Right now you are discarding those transactions that didn't match so you > instead would need to persist them somewhere and either reinject them into > the job that does lookup (say after x minutes) > > Is this what you are looking for? > > On Fri, 9 Jul 2021, 9:44 pm Bruno Oliveira, <bruno.ar...@gmail.com> wrote: > >> I'm terribly sorry, Mich. That was my mistake. >> The timestamps are not the same (I copy&pasted without realizing that, >> I'm really sorry for the confusion) >> >> Please assume NONE of the following transactions are in the database yet >> >> *transactions-created:* >> { "transaction_id": 1, "amount": 1000, "timestamp": "2020-04-04 >> 11:01:00" } >> { "transaction_id": 2, "amount": 2000, "timestamp": "2020-04-04 >> 08:02:00" } >> >> *transactions-processed: * >> { "transaction_id": 1, "timestamp": "2020-04-04 11:03:00" } // so >> it's processed 2 minutes after it was created >> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" } // so >> it's processed 4 hours after it was created >> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" } // cannot >> be persisted into the DB yet, because this "transaction_id 3" with the >> status "CREATED" does NOT exist in the DB >> >> >> *(...) Transactions-created are created at the same time (the same >>> timestamp) but you have NOT received them and they don't yet exist in your >>> DB (...)* >> >> - Not at the same timestamp, that was my mistake. >> - Imagine two transactions with the same ID (neither of them are in any >> Kafka topic yet), >> >> - One with the status CREATED, and another with the status PROCESSED, >> - The one with the status PROCESSED will ALWAYS have a higher/greater >> timestamp than the one with the status CREATED >> - Now for whatever reason, this happens: >> - Step a) some producer *fails* to push the *created* one to the >> topic *transactions-created, it will RETRY, and will eventually >> succeed, but that can take minutes, or hours* >> - Step b) however, the producer *succeeds* in pushing the* >> 'processed' *one to the topic *transactions-processed * >> >> >> *(...) because presumably your relational database is too slow to ingest >>> them? (...)* >> >> >> - it's not like the DB was slow, it was because the message for >> transaction_id 3 didn't arrive at the *topic-created *yet, due to some >> error/failure in Step A, for example >> >> >> * you do a query in Postgres for say transaction_id 3 but they don't >>> exist yet? When are they expected to arrive?* >> >> >> - That's correct. It could take minutes, maybe hours. But it is >> guaranteed that at some point, in the future, they will arrive. I just have >> to keep trying until it works, this transaction_id 3 with the status >> CREATED arrives at the database >> >> >> Huge apologies for the confusion... Is it a bit more clear now? >> >> *PS:* This is a simplified scenario, in practise, there is yet another >> topic for "transactions-refunded". But which cannot be sinked to the DB, >> unless the same transaction_id with the status "PROCESSED" is there. (but >> again, there can only be a transaction_id PROCESSED, if the same >> transaction_id with CREATED exists in the DB) >> >> >> On Fri, Jul 9, 2021 at 4:51 PM Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> One second >>> >>> The topic called transactions_processed is streaming through Spark. >>> Transactions-created are created at the same time (the same timestamp) but >>> you have NOT received them and they don't yet exist in your DB, >>> because presumably your relational database is too slow to ingest them? you >>> do a query in Postgres for say transaction_id 3 but they don't exist yet? >>> When are they expected to arrive? >>> >>> >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Fri, 9 Jul 2021 at 19:12, Bruno Oliveira <bruno.ar...@gmail.com> >>> wrote: >>> >>>> Thanks for the quick reply! >>>> >>>> I'm not sure I got the idea correctly... but from what I'm underding, >>>> wouldn't that actually end the same way? >>>> Because, this is the current scenario: >>>> >>>> *transactions-processed: * >>>> { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" } >>>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" } >>>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" } >>>> { "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" } >>>> >>>> *transactions-created:* >>>> { "transaction_id": 1, "amount": 1000, "timestamp": "2020-04-04 >>>> 11:01:00" } >>>> { "transaction_id": 2, "amount": 2000, "timestamp": "2020-04-04 >>>> 12:02:00" } >>>> >>>> - So, when I fetch ALL messages from both topics, there are still 2x >>>> transactions (id: "*3*" and "*4*") which do *not* exist in the topic >>>> "transaction-created" yet (and they aren't in Postgres either) >>>> - But since they were pulled by "Structured Streaming" already, they'll >>>> be kinda marked as "processed" by Spark Structure Streaming checkpoint >>>> anyway. >>>> >>>> And therefore, I can't replay/reprocess them again... >>>> >>>> Is my understanding correct? Am I missing something here? >>>> >>>> On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> Thanks for the details. >>>>> >>>>> Can you read these in the same app. For example. This is PySpark but >>>>> it serves the purpose. >>>>> >>>>> Read topic "newtopic" in micro batch and the other topic "md" in >>>>> another microbatch >>>>> >>>>> try: >>>>> # process topic --> newtopic >>>>> streamingNewtopic = self.spark \ >>>>> .readStream \ >>>>> .format("kafka") \ >>>>> .option("kafka.bootstrap.servers", >>>>> config['MDVariables']['bootstrapServers'],) \ >>>>> .option("schema.registry.url", >>>>> config['MDVariables']['schemaRegistryURL']) \ >>>>> .option("group.id", config['common']['newtopic']) \ >>>>> .option("zookeeper.connection.timeout.ms", >>>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >>>>> .option("rebalance.backoff.ms", >>>>> config['MDVariables']['rebalanceBackoffMS']) \ >>>>> .option("zookeeper.session.timeout.ms", >>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \ >>>>> .option("auto.commit.interval.ms", >>>>> config['MDVariables']['autoCommitIntervalMS']) \ >>>>> *.option("subscribe", >>>>> config['MDVariables']['newtopic']) \* >>>>> .option("failOnDataLoss", "false") \ >>>>> .option("includeHeaders", "true") \ >>>>> .option("startingOffsets", "latest") \ >>>>> .load() \ >>>>> .select(from_json(col("value").cast("string"), >>>>> newtopicSchema).alias("newtopic_value")) >>>>> >>>>> # construct a streaming dataframe streamingDataFrame that >>>>> subscribes to topic config['MDVariables']['topic']) -> md (market data) >>>>> streamingDataFrame = self.spark \ >>>>> .readStream \ >>>>> .format("kafka") \ >>>>> .option("kafka.bootstrap.servers", >>>>> config['MDVariables']['bootstrapServers'],) \ >>>>> .option("schema.registry.url", >>>>> config['MDVariables']['schemaRegistryURL']) \ >>>>> .option("group.id", config['common']['appName']) \ >>>>> .option("zookeeper.connection.timeout.ms", >>>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >>>>> .option("rebalance.backoff.ms", >>>>> config['MDVariables']['rebalanceBackoffMS']) \ >>>>> .option("zookeeper.session.timeout.ms", >>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \ >>>>> .option("auto.commit.interval.ms", >>>>> config['MDVariables']['autoCommitIntervalMS']) \ >>>>> *.option("subscribe", config['MDVariables']['topic']) >>>>> \* >>>>> .option("failOnDataLoss", "false") \ >>>>> .option("includeHeaders", "true") \ >>>>> .option("startingOffsets", "latest") \ >>>>> .load() \ >>>>> .select(from_json(col("value").cast("string"), >>>>> schema).alias("parsed_value")) >>>>> >>>>> >>>>> streamingNewtopic.printSchema() >>>>> >>>>> # Now do a writeStream and call the relevant functions to >>>>> process dataframes >>>>> >>>>> newtopicResult = streamingNewtopic.select( \ >>>>> col("newtopic_value.uuid").alias("uuid") \ >>>>> , >>>>> col("newtopic_value.timeissued").alias("timeissued") \ >>>>> , col("newtopic_value.queue").alias("queue") \ >>>>> , col("newtopic_value.status").alias("status")). \ >>>>> writeStream. \ >>>>> outputMode('append'). \ >>>>> option("truncate", "false"). \ >>>>> * foreachBatch(sendToControl). \* >>>>> trigger(processingTime='2 seconds'). \ >>>>> queryName(config['MDVariables']['newtopic']). \ >>>>> start() >>>>> >>>>> result = streamingDataFrame.select( \ >>>>> col("parsed_value.rowkey").alias("rowkey") \ >>>>> , col("parsed_value.ticker").alias("ticker") \ >>>>> , >>>>> col("parsed_value.timeissued").alias("timeissued") \ >>>>> , col("parsed_value.price").alias("price")). \ >>>>> writeStream. \ >>>>> outputMode('append'). \ >>>>> option("truncate", "false"). \ >>>>> *foreachBatch(sendToSink). \* >>>>> trigger(processingTime='30 seconds'). \ >>>>> option('checkpointLocation', checkpoint_path). \ >>>>> queryName(config['MDVariables']['topic']). \ >>>>> start() >>>>> print(result) >>>>> >>>>> except Exception as e: >>>>> print(f"""{e}, quitting""") >>>>> sys.exit(1) >>>>> >>>>> Inside that function say *sendToSink *you can get the df and batchId >>>>> >>>>> def sendToSink(df, batchId): >>>>> if(len(df.take(1))) > 0: >>>>> print(f"""md batchId is {batchId}""") >>>>> df.show(100,False) >>>>> df. persist() >>>>> # write to BigQuery batch table >>>>> s.writeTableToBQ(df, "append", >>>>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) >>>>> df.unpersist() >>>>> print(f"""wrote to DB""") >>>>> else: >>>>> print("DataFrame md is empty") >>>>> >>>>> And you have created DF from the other topic newtopic >>>>> >>>>> def sendToControl(dfnewtopic, batchId): >>>>> if(len(dfnewtopic.take(1))) > 0: >>>>> ...... >>>>> >>>>> Now you have two dataframe* df* and *dfnewtopic* in the same >>>>> session. Will you be able to join these two dataframes through common key >>>>> value? >>>>> >>>>> HTH >>>>> >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, 9 Jul 2021 at 17:41, Bruno Oliveira <bruno.ar...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello! Sure thing! >>>>>> >>>>>> I'm reading them *separately*, both are apps written with Scala + >>>>>> Spark Structured Streaming. >>>>>> >>>>>> I feel like I missed some details on my original thread (sorry it was >>>>>> past 4 AM) and it was getting frustrating >>>>>> Please let me try to clarify some points: >>>>>> >>>>>> *Transactions Created Consumer* >>>>>> ----------------------------------- >>>>>> | Kafka trx-created-topic | <--- (Scala + SparkStructured >>>>>> Streaming) ConsumerApp ---> Sinks to ---> Postgres DB Table >>>>>> (Transactions) >>>>>> ----------------------------------- >>>>>> >>>>>> *Transactions Processed Consumer* >>>>>> ------------------------------------- >>>>>> | Kafka trx-processed-topic | <--- 1) (Scala + SparkStructured >>>>>> Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a") >>>>>> ------------------------------------- 2) Selects the Ids >>>>>> ------------------------------------- >>>>>> | Postgres / Trx table |. <--- 3) Fetches the rows w/ the >>>>>> matching ids that have status 'created (let's call it "b") >>>>>> ------------------------------------- 4) Performs an >>>>>> intersection between "a" and "b" resulting in a "b_that_needs_sinking" >>>>>> (but >>>>>> now there's some "b_leftovers" that were out of the intersection) >>>>>> 5) Sinks >>>>>> "b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as >>>>>> unprocessed (not persisted) >>>>>> 6) However, >>>>>> those "b_leftovers" would, ultimately, be processed at some point (even >>>>>> if >>>>>> it takes like 1-3 days) - when their corresponding transaction_id are >>>>>> pushed to >>>>>> the "trx-created-topic" Kafka topic, and are then processed by that first >>>>>> consumer >>>>>> >>>>>> So, what I'm trying to accomplish is find a way to reprocess those >>>>>> "b_leftovers" *without *having to restart the app >>>>>> Does that make sense? >>>>>> >>>>>> PS: It doesn't necessarily have to be real streaming, if >>>>>> micro-batching (legacy Spark Streaming) would allow such a thing, it >>>>>> would >>>>>> technically work (although I keep hearing it's not advisable) >>>>>> >>>>>> Thank you so much! >>>>>> >>>>>> Kind regards >>>>>> >>>>>> On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Can you please clarify if you are reading these two topics >>>>>>> separately or within the same scala or python script in Spark Structured >>>>>>> Streaming? >>>>>>> >>>>>>> HTH >>>>>>> >>>>>>> >>>>>>> view my Linkedin profile >>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira <bruno.ar...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello guys, >>>>>>>> >>>>>>>> I've been struggling with this for some days now, without success, >>>>>>>> so I would highly appreciate any enlightenment. The simplified >>>>>>>> scenario is >>>>>>>> the following: >>>>>>>> >>>>>>>> - I've got 2 topics in Kafka (it's already like that in >>>>>>>> production, can't change it) >>>>>>>> - transactions-created, >>>>>>>> - transaction-processed >>>>>>>> - Even though the schema is not exactly the same, they all >>>>>>>> share a correlation_id, which is their "transaction_id" >>>>>>>> >>>>>>>> So, long story short, I've got 2 consumers, one for each topic, and >>>>>>>> all I wanna do is sink them in a chain order. I'm writing them w/ Spark >>>>>>>> Structured Streaming, btw >>>>>>>> >>>>>>>> So far so good, the caveat here is: >>>>>>>> >>>>>>>> - I cannot write a given "*processed" *transaction unless there is >>>>>>>> an entry of that same transaction with the status "*created*". >>>>>>>> >>>>>>>> - There is *no* guarantee that any transactions in the topic >>>>>>>> "transaction-*processed*" have a match (same transaction_id) in >>>>>>>> the "transaction-*created*" at the moment the messages are fetched. >>>>>>>> >>>>>>>> So the workflow so far is: >>>>>>>> - Msgs from the "transaction-created" just get synced to postgres, >>>>>>>> no questions asked >>>>>>>> >>>>>>>> - As for the "transaction-processed", it goes as follows: >>>>>>>> >>>>>>>> - a) Messages are fetched from the Kafka topic >>>>>>>> - b) Select the transaction_id of those... >>>>>>>> - c) Fetch all the rows w/ the corresponding id from a Postgres >>>>>>>> table AND that have the status "CREATED" >>>>>>>> - d) Then, a pretty much do a intersection between the two >>>>>>>> datasets, and sink only on "processed" ones that have with step c >>>>>>>> - e) Persist the resulting dataset >>>>>>>> >>>>>>>> But the rows (from the 'processed') that were not part of the >>>>>>>> intersection get lost afterwards... >>>>>>>> >>>>>>>> So my question is: >>>>>>>> - Is there ANY way to reprocess/replay them at all WITHOUT >>>>>>>> restarting the app? >>>>>>>> - For this scenario, should I fall back to Spark Streaming, instead >>>>>>>> of Structured Streaming? >>>>>>>> >>>>>>>> PS: I was playing around with Spark Streaming (legacy) and managed >>>>>>>> to commit only the ones in the microbatches that were fully successful >>>>>>>> (still failed to find a way to "poll" for the uncommitted ones without >>>>>>>> restarting, though). >>>>>>>> >>>>>>>> Thank you very much in advance! >>>>>>>> >>>>>>>>