That is exactly the case, Sebastian!

- In practise, that "created  means "*authorized*", but I still cannot
deduct anything from the customer balance
- the "processed" means I can safely deduct the transaction_amount  from
the customer balance,
- and the "refunded" means I must give the transaction amount back to the
customer balance

So, technically, we cannot process something that is not "AUTHORIZED"
(created) yet, nor can we process a refund for a transaction that has NOT
been PROCESSED yet.


*You have an authorisation, then the actual transaction and maybe a refund
> some time in the future. You want to proceed with a transaction only if
> you've seen the auth but in an eventually consistent system this might not
> always happen.*


That's absolutely the case! So, yes, That's correct.

*You are asking in the case of receiving the transaction before the auth
> how to retry later? *


Yeah! I'm struggling for days on how to solve with Spark Structured
Streaming...

*Right now you are discarding those transactions that didn't match so you
> instead would need to persist them somewhere and either reinject them into
> the job that does lookup (say after x minutes) *



*Right now, the best I could think of is: *

   - Say, I'm reading the messages w/ transaction_id [1, 2, 3] from Kafka
   (topic "transactions-processed")
   - Then I'm querying the database for these IDs that have the status
   "CREATED" (or "AUTHORIZED" to be more accurate), and it returns the
   transactions for IDs [1, 2]
   - So, while it'll work for the ones with ID [1. 2] , I would have to put
   that transaction_id 3 in another topic, say, "
   *transaction-processed-retry*"
   - And write yet another consumer, to fetch the messages from that
"*transaction-processed-retry"
   *and put them back to the original topic (transactions-processed)
   - And do something similar for the transactions-refunded

*Q1) *I think this approach may work, but I can't stop thinking I'm
overengineering this, and was wondering if there isn't a better approach...
?

*Is this what you are looking for?*


Yes, that's exactly it.


*Q2)* I know that, under the hood, Structured Streaming is actually using
the micro-batch engine,
         if I switched to *Continuous Processing*, would it make any
difference? Would it allow me any "retry" mechanism out of the box?

*Q3)* I stumbled upon a *Stateful Streaming* (
https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming)
, but I have never ever used it before,
        would that actually do something for my case (retrying/replaying a
given message) ?


Thank you very VERY in advance!
Best regards


On Fri, Jul 9, 2021 at 6:36 PM Sebastian Piu <sebastian....@gmail.com>
wrote:

> So in payment systems you have something similar I think
>
> You have an authorisation, then the actual transaction and maybe a refund
> some time in the future. You want to proceed with a transaction only if
> you've seen the auth but in an eventually consistent system this might not
> always happen.
>
> You are asking in the case of receiving the transaction before the auth
> how to retry later?
>
> Right now you are discarding those transactions that didn't match so you
> instead would need to persist them somewhere and either reinject them into
> the job that does lookup (say after x minutes)
>
> Is this what you are looking for?
>
> On Fri, 9 Jul 2021, 9:44 pm Bruno Oliveira, <bruno.ar...@gmail.com> wrote:
>
>> I'm terribly sorry, Mich. That was my mistake.
>> The timestamps are not the same (I copy&pasted without realizing that,
>> I'm really sorry for the confusion)
>>
>> Please assume NONE of the following transactions are in the database yet
>>
>> *transactions-created:*
>> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04
>> 11:01:00" }
>> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
>> 08:02:00" }
>>
>> *transactions-processed: *
>> { "transaction_id": 1, "timestamp": "2020-04-04 11:03:00" }     // so
>> it's processed 2 minutes after it was created
>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }     // so
>> it's processed 4 hours after it was created
>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }    // cannot
>> be persisted into the DB yet, because this "transaction_id 3" with the
>> status "CREATED" does NOT exist in the DB
>>
>>
>> *(...) Transactions-created are created at the same time (the same
>>> timestamp) but you have NOT received them and they don't yet exist in your
>>> DB (...)*
>>
>> - Not at the same timestamp, that was my mistake.
>> - Imagine two transactions with the same ID (neither of them are in any
>> Kafka topic yet),
>>
>>    - One with the status CREATED, and another with the status PROCESSED,
>>    - The one with the status PROCESSED will ALWAYS have a higher/greater
>>    timestamp than the one with the status CREATED
>>    - Now for whatever reason, this happens:
>>       - Step a) some producer *fails* to push the *created* one to the
>>       topic  *transactions-created, it will RETRY, and will eventually
>>       succeed, but that can take minutes, or hours*
>>       - Step b) however, the producer *succeeds* in pushing the*
>>       'processed' *one to the topic *transactions-processed *
>>
>>
>> *(...) because presumably your relational database is too slow to ingest
>>> them? (...)*
>>
>>
>> - it's not like the DB was slow, it was because the message for
>> transaction_id 3 didn't arrive at the *topic-created *yet, due to some
>> error/failure in Step A, for example
>>
>>
>> * you do a query in Postgres for say transaction_id 3 but they don't
>>> exist yet? When are they expected to arrive?*
>>
>>
>> - That's correct. It could take minutes, maybe hours. But it is
>> guaranteed that at some point, in the future, they will arrive. I just have
>> to keep trying until it works, this transaction_id 3 with the status
>> CREATED arrives at the database
>>
>>
>> Huge apologies for the confusion... Is it a bit more clear now?
>>
>> *PS:* This is a simplified scenario, in practise, there is yet another
>> topic for "transactions-refunded". But which cannot be sinked to the DB,
>> unless the same transaction_id with the status "PROCESSED" is there. (but
>> again, there can only be a transaction_id PROCESSED, if the same
>> transaction_id with CREATED exists in the DB)
>>
>>
>> On Fri, Jul 9, 2021 at 4:51 PM Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> One second
>>>
>>> The topic called transactions_processed is streaming through Spark.
>>> Transactions-created are created at the same time (the same timestamp) but
>>> you have NOT received them and they don't yet exist in your DB,
>>> because presumably your relational database is too slow to ingest them? you
>>> do a query in Postgres for say transaction_id 3 but they don't exist yet?
>>> When are they expected to arrive?
>>>
>>>
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 9 Jul 2021 at 19:12, Bruno Oliveira <bruno.ar...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for the quick reply!
>>>>
>>>> I'm not sure I got the idea correctly... but from what I'm underding,
>>>> wouldn't that actually end the same way?
>>>> Because, this is the current scenario:
>>>>
>>>> *transactions-processed: *
>>>> { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" }
>>>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }
>>>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }
>>>> { "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" }
>>>>
>>>> *transactions-created:*
>>>> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04
>>>> 11:01:00" }
>>>> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
>>>> 12:02:00" }
>>>>
>>>> - So, when I fetch ALL messages from both topics, there are still 2x
>>>> transactions (id: "*3*" and "*4*") which do *not* exist in the topic
>>>> "transaction-created" yet (and they aren't in Postgres either)
>>>> - But since they were pulled by "Structured Streaming" already, they'll
>>>> be kinda marked as "processed" by Spark Structure Streaming checkpoint
>>>> anyway.
>>>>
>>>> And therefore, I can't replay/reprocess them again...
>>>>
>>>> Is my understanding correct? Am I missing something here?
>>>>
>>>> On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Thanks for the details.
>>>>>
>>>>> Can you read these in the same app. For example. This is PySpark but
>>>>> it serves the purpose.
>>>>>
>>>>> Read topic "newtopic" in micro batch and the other topic "md" in
>>>>> another microbatch
>>>>>
>>>>>         try:
>>>>>             # process topic --> newtopic
>>>>>             streamingNewtopic = self.spark \
>>>>>                 .readStream \
>>>>>                 .format("kafka") \
>>>>>                 .option("kafka.bootstrap.servers",
>>>>> config['MDVariables']['bootstrapServers'],) \
>>>>>                 .option("schema.registry.url",
>>>>> config['MDVariables']['schemaRegistryURL']) \
>>>>>                 .option("group.id", config['common']['newtopic']) \
>>>>>                 .option("zookeeper.connection.timeout.ms",
>>>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>>>>                 .option("rebalance.backoff.ms",
>>>>> config['MDVariables']['rebalanceBackoffMS']) \
>>>>>                 .option("zookeeper.session.timeout.ms",
>>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>>>>                 .option("auto.commit.interval.ms",
>>>>> config['MDVariables']['autoCommitIntervalMS']) \
>>>>>                 *.option("subscribe",
>>>>> config['MDVariables']['newtopic']) \*
>>>>>                 .option("failOnDataLoss", "false") \
>>>>>                 .option("includeHeaders", "true") \
>>>>>                 .option("startingOffsets", "latest") \
>>>>>                 .load() \
>>>>>                 .select(from_json(col("value").cast("string"),
>>>>> newtopicSchema).alias("newtopic_value"))
>>>>>
>>>>>             # construct a streaming dataframe streamingDataFrame that
>>>>> subscribes to topic config['MDVariables']['topic']) -> md (market data)
>>>>>             streamingDataFrame = self.spark \
>>>>>                 .readStream \
>>>>>                 .format("kafka") \
>>>>>                 .option("kafka.bootstrap.servers",
>>>>> config['MDVariables']['bootstrapServers'],) \
>>>>>                 .option("schema.registry.url",
>>>>> config['MDVariables']['schemaRegistryURL']) \
>>>>>                 .option("group.id", config['common']['appName']) \
>>>>>                 .option("zookeeper.connection.timeout.ms",
>>>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>>>>                 .option("rebalance.backoff.ms",
>>>>> config['MDVariables']['rebalanceBackoffMS']) \
>>>>>                 .option("zookeeper.session.timeout.ms",
>>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>>>>                 .option("auto.commit.interval.ms",
>>>>> config['MDVariables']['autoCommitIntervalMS']) \
>>>>>                 *.option("subscribe", config['MDVariables']['topic'])
>>>>> \*
>>>>>                 .option("failOnDataLoss", "false") \
>>>>>                 .option("includeHeaders", "true") \
>>>>>                 .option("startingOffsets", "latest") \
>>>>>                 .load() \
>>>>>                 .select(from_json(col("value").cast("string"),
>>>>> schema).alias("parsed_value"))
>>>>>
>>>>>
>>>>>             streamingNewtopic.printSchema()
>>>>>
>>>>>             # Now do a writeStream and call the relevant functions to
>>>>> process dataframes
>>>>>
>>>>>             newtopicResult = streamingNewtopic.select( \
>>>>>                      col("newtopic_value.uuid").alias("uuid") \
>>>>>                    ,
>>>>> col("newtopic_value.timeissued").alias("timeissued") \
>>>>>                    , col("newtopic_value.queue").alias("queue") \
>>>>>                    , col("newtopic_value.status").alias("status")). \
>>>>>                      writeStream. \
>>>>>                      outputMode('append'). \
>>>>>                      option("truncate", "false"). \
>>>>>   *                   foreachBatch(sendToControl). \*
>>>>>                      trigger(processingTime='2 seconds'). \
>>>>>                      queryName(config['MDVariables']['newtopic']). \
>>>>>                      start()
>>>>>
>>>>>             result = streamingDataFrame.select( \
>>>>>                      col("parsed_value.rowkey").alias("rowkey") \
>>>>>                    , col("parsed_value.ticker").alias("ticker") \
>>>>>                    ,
>>>>> col("parsed_value.timeissued").alias("timeissued") \
>>>>>                    , col("parsed_value.price").alias("price")). \
>>>>>                      writeStream. \
>>>>>                      outputMode('append'). \
>>>>>                      option("truncate", "false"). \
>>>>>                      *foreachBatch(sendToSink). \*
>>>>>                      trigger(processingTime='30 seconds'). \
>>>>>                      option('checkpointLocation', checkpoint_path). \
>>>>>                      queryName(config['MDVariables']['topic']). \
>>>>>                      start()
>>>>>             print(result)
>>>>>
>>>>>         except Exception as e:
>>>>>                 print(f"""{e}, quitting""")
>>>>>                 sys.exit(1)
>>>>>
>>>>> Inside that function say *sendToSink *you can get the df and batchId
>>>>>
>>>>> def sendToSink(df, batchId):
>>>>>     if(len(df.take(1))) > 0:
>>>>>         print(f"""md batchId is {batchId}""")
>>>>>         df.show(100,False)
>>>>>         df. persist()
>>>>>         # write to BigQuery batch table
>>>>>         s.writeTableToBQ(df, "append",
>>>>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>>>>>         df.unpersist()
>>>>>         print(f"""wrote to DB""")
>>>>>     else:
>>>>>         print("DataFrame md is empty")
>>>>>
>>>>> And you have created DF from the other topic newtopic
>>>>>
>>>>> def sendToControl(dfnewtopic, batchId):
>>>>>     if(len(dfnewtopic.take(1))) > 0:
>>>>>         ......
>>>>>
>>>>> Now you have  two dataframe* df* and *dfnewtopic* in the same
>>>>> session. Will you be able to join these two dataframes through common key
>>>>> value?
>>>>>
>>>>> HTH
>>>>>
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, 9 Jul 2021 at 17:41, Bruno Oliveira <bruno.ar...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello! Sure thing!
>>>>>>
>>>>>> I'm reading them *separately*, both are apps written with Scala +
>>>>>> Spark Structured Streaming.
>>>>>>
>>>>>> I feel like I missed some details on my original thread (sorry it was
>>>>>> past 4 AM) and it was getting frustrating
>>>>>> Please let me try to clarify some points:
>>>>>>
>>>>>> *Transactions Created Consumer*
>>>>>> -----------------------------------
>>>>>> | Kafka trx-created-topic   |   <--- (Scala + SparkStructured
>>>>>> Streaming) ConsumerApp --->  Sinks to ---> Postgres DB Table
>>>>>> (Transactions)
>>>>>> -----------------------------------
>>>>>>
>>>>>> *Transactions Processed Consumer*
>>>>>> -------------------------------------
>>>>>> | Kafka trx-processed-topic |  <---   1) (Scala + SparkStructured
>>>>>> Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a")
>>>>>> -------------------------------------           2) Selects the Ids
>>>>>> -------------------------------------
>>>>>> |   Postgres / Trx table         |. <--- 3) Fetches the rows w/ the
>>>>>> matching ids that have status 'created (let's call it "b")
>>>>>> -------------------------------------         4)  Performs an
>>>>>> intersection between "a" and "b" resulting in a "b_that_needs_sinking" 
>>>>>> (but
>>>>>> now there's some "b_leftovers" that were out of the intersection)
>>>>>>                                                      5)  Sinks
>>>>>> "b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as
>>>>>> unprocessed (not persisted)
>>>>>>                                                      6) However,
>>>>>> those "b_leftovers" would, ultimately, be processed at some point (even 
>>>>>> if
>>>>>> it takes like 1-3 days) - when their corresponding transaction_id are
>>>>>>                                                          pushed to
>>>>>> the "trx-created-topic" Kafka topic, and are then processed by that first
>>>>>> consumer
>>>>>>
>>>>>> So, what I'm trying to accomplish is find a way to reprocess those
>>>>>> "b_leftovers" *without *having to restart the app
>>>>>> Does that make sense?
>>>>>>
>>>>>> PS: It doesn't necessarily have to be real streaming, if
>>>>>> micro-batching (legacy Spark Streaming) would allow such a thing, it 
>>>>>> would
>>>>>> technically work (although I keep hearing it's not advisable)
>>>>>>
>>>>>> Thank you so much!
>>>>>>
>>>>>> Kind regards
>>>>>>
>>>>>> On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> Can you please clarify if you are reading these two topics
>>>>>>> separately or within the same scala or python script in Spark Structured
>>>>>>> Streaming?
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>>
>>>>>>>    view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira <bruno.ar...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello guys,
>>>>>>>>
>>>>>>>> I've been struggling with this for some days now, without success,
>>>>>>>> so I would highly appreciate any enlightenment. The simplified 
>>>>>>>> scenario is
>>>>>>>> the following:
>>>>>>>>
>>>>>>>>    - I've got 2 topics in Kafka (it's already like that in
>>>>>>>>    production, can't change it)
>>>>>>>>       - transactions-created,
>>>>>>>>       - transaction-processed
>>>>>>>>    - Even though the schema is not exactly the same, they all
>>>>>>>>    share a correlation_id, which is their "transaction_id"
>>>>>>>>
>>>>>>>> So, long story short, I've got 2 consumers, one for each topic, and
>>>>>>>> all I wanna do is sink them in a chain order. I'm writing them w/ Spark
>>>>>>>> Structured Streaming, btw
>>>>>>>>
>>>>>>>> So far so good, the caveat here is:
>>>>>>>>
>>>>>>>> - I cannot write a given "*processed" *transaction unless there is
>>>>>>>> an entry of that same transaction with the status "*created*".
>>>>>>>>
>>>>>>>> - There is *no* guarantee that any transactions in the topic
>>>>>>>> "transaction-*processed*" have a match (same transaction_id) in
>>>>>>>> the "transaction-*created*" at the moment the messages are fetched.
>>>>>>>>
>>>>>>>> So the workflow so far is:
>>>>>>>> - Msgs from the "transaction-created" just get synced to postgres,
>>>>>>>> no questions asked
>>>>>>>>
>>>>>>>> - As for the "transaction-processed", it goes as follows:
>>>>>>>>
>>>>>>>>    - a) Messages are fetched from the Kafka topic
>>>>>>>>    - b) Select the transaction_id of those...
>>>>>>>>    - c) Fetch all the rows w/ the corresponding id from a Postgres
>>>>>>>>    table AND that have the status "CREATED"
>>>>>>>>    - d) Then, a pretty much do a intersection between the two
>>>>>>>>    datasets, and sink only on "processed" ones that have with step c
>>>>>>>>    - e) Persist the resulting dataset
>>>>>>>>
>>>>>>>> But the rows (from the 'processed') that were not part of the
>>>>>>>> intersection get lost afterwards...
>>>>>>>>
>>>>>>>> So my question is:
>>>>>>>> - Is there ANY way to reprocess/replay them at all WITHOUT
>>>>>>>> restarting the app?
>>>>>>>> - For this scenario, should I fall back to Spark Streaming, instead
>>>>>>>> of Structured Streaming?
>>>>>>>>
>>>>>>>> PS: I was playing around with Spark Streaming (legacy) and managed
>>>>>>>> to commit only the ones in the microbatches that were fully successful
>>>>>>>> (still failed to find a way to "poll" for the uncommitted ones without
>>>>>>>> restarting, though).
>>>>>>>>
>>>>>>>> Thank you very much in advance!
>>>>>>>>
>>>>>>>>

Reply via email to