I'm terribly sorry, Mich. That was my mistake. The timestamps are not the same (I copy&pasted without realizing that, I'm really sorry for the confusion)
Please assume NONE of the following transactions are in the database yet *transactions-created:* { "transaction_id": 1, "amount": 1000, "timestamp": "2020-04-04 11:01:00" } { "transaction_id": 2, "amount": 2000, "timestamp": "2020-04-04 08:02:00" } *transactions-processed: * { "transaction_id": 1, "timestamp": "2020-04-04 11:03:00" } // so it's processed 2 minutes after it was created { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" } // so it's processed 4 hours after it was created { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" } // cannot be persisted into the DB yet, because this "transaction_id 3" with the status "CREATED" does NOT exist in the DB *(...) Transactions-created are created at the same time (the same > timestamp) but you have NOT received them and they don't yet exist in your > DB (...)* - Not at the same timestamp, that was my mistake. - Imagine two transactions with the same ID (neither of them are in any Kafka topic yet), - One with the status CREATED, and another with the status PROCESSED, - The one with the status PROCESSED will ALWAYS have a higher/greater timestamp than the one with the status CREATED - Now for whatever reason, this happens: - Step a) some producer *fails* to push the *created* one to the topic *transactions-created, it will RETRY, and will eventually succeed, but that can take minutes, or hours* - Step b) however, the producer *succeeds* in pushing the* 'processed' *one to the topic *transactions-processed * *(...) because presumably your relational database is too slow to ingest > them? (...)* - it's not like the DB was slow, it was because the message for transaction_id 3 didn't arrive at the *topic-created *yet, due to some error/failure in Step A, for example * you do a query in Postgres for say transaction_id 3 but they don't exist > yet? When are they expected to arrive?* - That's correct. It could take minutes, maybe hours. But it is guaranteed that at some point, in the future, they will arrive. I just have to keep trying until it works, this transaction_id 3 with the status CREATED arrives at the database Huge apologies for the confusion... Is it a bit more clear now? *PS:* This is a simplified scenario, in practise, there is yet another topic for "transactions-refunded". But which cannot be sinked to the DB, unless the same transaction_id with the status "PROCESSED" is there. (but again, there can only be a transaction_id PROCESSED, if the same transaction_id with CREATED exists in the DB) On Fri, Jul 9, 2021 at 4:51 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > One second > > The topic called transactions_processed is streaming through Spark. > Transactions-created are created at the same time (the same timestamp) but > you have NOT received them and they don't yet exist in your DB, > because presumably your relational database is too slow to ingest them? you > do a query in Postgres for say transaction_id 3 but they don't exist yet? > When are they expected to arrive? > > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 9 Jul 2021 at 19:12, Bruno Oliveira <bruno.ar...@gmail.com> wrote: > >> Thanks for the quick reply! >> >> I'm not sure I got the idea correctly... but from what I'm underding, >> wouldn't that actually end the same way? >> Because, this is the current scenario: >> >> *transactions-processed: * >> { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" } >> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" } >> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" } >> { "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" } >> >> *transactions-created:* >> { "transaction_id": 1, "amount": 1000, "timestamp": "2020-04-04 >> 11:01:00" } >> { "transaction_id": 2, "amount": 2000, "timestamp": "2020-04-04 >> 12:02:00" } >> >> - So, when I fetch ALL messages from both topics, there are still 2x >> transactions (id: "*3*" and "*4*") which do *not* exist in the topic >> "transaction-created" yet (and they aren't in Postgres either) >> - But since they were pulled by "Structured Streaming" already, they'll >> be kinda marked as "processed" by Spark Structure Streaming checkpoint >> anyway. >> >> And therefore, I can't replay/reprocess them again... >> >> Is my understanding correct? Am I missing something here? >> >> On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> Thanks for the details. >>> >>> Can you read these in the same app. For example. This is PySpark but it >>> serves the purpose. >>> >>> Read topic "newtopic" in micro batch and the other topic "md" in another >>> microbatch >>> >>> try: >>> # process topic --> newtopic >>> streamingNewtopic = self.spark \ >>> .readStream \ >>> .format("kafka") \ >>> .option("kafka.bootstrap.servers", >>> config['MDVariables']['bootstrapServers'],) \ >>> .option("schema.registry.url", >>> config['MDVariables']['schemaRegistryURL']) \ >>> .option("group.id", config['common']['newtopic']) \ >>> .option("zookeeper.connection.timeout.ms", >>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >>> .option("rebalance.backoff.ms", >>> config['MDVariables']['rebalanceBackoffMS']) \ >>> .option("zookeeper.session.timeout.ms", >>> config['MDVariables']['zookeeperSessionTimeOutMs']) \ >>> .option("auto.commit.interval.ms", >>> config['MDVariables']['autoCommitIntervalMS']) \ >>> *.option("subscribe", >>> config['MDVariables']['newtopic']) \* >>> .option("failOnDataLoss", "false") \ >>> .option("includeHeaders", "true") \ >>> .option("startingOffsets", "latest") \ >>> .load() \ >>> .select(from_json(col("value").cast("string"), >>> newtopicSchema).alias("newtopic_value")) >>> >>> # construct a streaming dataframe streamingDataFrame that >>> subscribes to topic config['MDVariables']['topic']) -> md (market data) >>> streamingDataFrame = self.spark \ >>> .readStream \ >>> .format("kafka") \ >>> .option("kafka.bootstrap.servers", >>> config['MDVariables']['bootstrapServers'],) \ >>> .option("schema.registry.url", >>> config['MDVariables']['schemaRegistryURL']) \ >>> .option("group.id", config['common']['appName']) \ >>> .option("zookeeper.connection.timeout.ms", >>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >>> .option("rebalance.backoff.ms", >>> config['MDVariables']['rebalanceBackoffMS']) \ >>> .option("zookeeper.session.timeout.ms", >>> config['MDVariables']['zookeeperSessionTimeOutMs']) \ >>> .option("auto.commit.interval.ms", >>> config['MDVariables']['autoCommitIntervalMS']) \ >>> *.option("subscribe", config['MDVariables']['topic']) \* >>> .option("failOnDataLoss", "false") \ >>> .option("includeHeaders", "true") \ >>> .option("startingOffsets", "latest") \ >>> .load() \ >>> .select(from_json(col("value").cast("string"), >>> schema).alias("parsed_value")) >>> >>> >>> streamingNewtopic.printSchema() >>> >>> # Now do a writeStream and call the relevant functions to >>> process dataframes >>> >>> newtopicResult = streamingNewtopic.select( \ >>> col("newtopic_value.uuid").alias("uuid") \ >>> , >>> col("newtopic_value.timeissued").alias("timeissued") \ >>> , col("newtopic_value.queue").alias("queue") \ >>> , col("newtopic_value.status").alias("status")). \ >>> writeStream. \ >>> outputMode('append'). \ >>> option("truncate", "false"). \ >>> * foreachBatch(sendToControl). \* >>> trigger(processingTime='2 seconds'). \ >>> queryName(config['MDVariables']['newtopic']). \ >>> start() >>> >>> result = streamingDataFrame.select( \ >>> col("parsed_value.rowkey").alias("rowkey") \ >>> , col("parsed_value.ticker").alias("ticker") \ >>> , col("parsed_value.timeissued").alias("timeissued") \ >>> , col("parsed_value.price").alias("price")). \ >>> writeStream. \ >>> outputMode('append'). \ >>> option("truncate", "false"). \ >>> *foreachBatch(sendToSink). \* >>> trigger(processingTime='30 seconds'). \ >>> option('checkpointLocation', checkpoint_path). \ >>> queryName(config['MDVariables']['topic']). \ >>> start() >>> print(result) >>> >>> except Exception as e: >>> print(f"""{e}, quitting""") >>> sys.exit(1) >>> >>> Inside that function say *sendToSink *you can get the df and batchId >>> >>> def sendToSink(df, batchId): >>> if(len(df.take(1))) > 0: >>> print(f"""md batchId is {batchId}""") >>> df.show(100,False) >>> df. persist() >>> # write to BigQuery batch table >>> s.writeTableToBQ(df, "append", >>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) >>> df.unpersist() >>> print(f"""wrote to DB""") >>> else: >>> print("DataFrame md is empty") >>> >>> And you have created DF from the other topic newtopic >>> >>> def sendToControl(dfnewtopic, batchId): >>> if(len(dfnewtopic.take(1))) > 0: >>> ...... >>> >>> Now you have two dataframe* df* and *dfnewtopic* in the same session. >>> Will you be able to join these two dataframes through common key value? >>> >>> HTH >>> >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Fri, 9 Jul 2021 at 17:41, Bruno Oliveira <bruno.ar...@gmail.com> >>> wrote: >>> >>>> Hello! Sure thing! >>>> >>>> I'm reading them *separately*, both are apps written with Scala + >>>> Spark Structured Streaming. >>>> >>>> I feel like I missed some details on my original thread (sorry it was >>>> past 4 AM) and it was getting frustrating >>>> Please let me try to clarify some points: >>>> >>>> *Transactions Created Consumer* >>>> ----------------------------------- >>>> | Kafka trx-created-topic | <--- (Scala + SparkStructured >>>> Streaming) ConsumerApp ---> Sinks to ---> Postgres DB Table >>>> (Transactions) >>>> ----------------------------------- >>>> >>>> *Transactions Processed Consumer* >>>> ------------------------------------- >>>> | Kafka trx-processed-topic | <--- 1) (Scala + SparkStructured >>>> Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a") >>>> ------------------------------------- 2) Selects the Ids >>>> ------------------------------------- >>>> | Postgres / Trx table |. <--- 3) Fetches the rows w/ the >>>> matching ids that have status 'created (let's call it "b") >>>> ------------------------------------- 4) Performs an >>>> intersection between "a" and "b" resulting in a "b_that_needs_sinking" (but >>>> now there's some "b_leftovers" that were out of the intersection) >>>> 5) Sinks >>>> "b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as >>>> unprocessed (not persisted) >>>> 6) However, those >>>> "b_leftovers" would, ultimately, be processed at some point (even if it >>>> takes like 1-3 days) - when their corresponding transaction_id are >>>> pushed to the >>>> "trx-created-topic" Kafka topic, and are then processed by that first >>>> consumer >>>> >>>> So, what I'm trying to accomplish is find a way to reprocess those >>>> "b_leftovers" *without *having to restart the app >>>> Does that make sense? >>>> >>>> PS: It doesn't necessarily have to be real streaming, if micro-batching >>>> (legacy Spark Streaming) would allow such a thing, it would technically >>>> work (although I keep hearing it's not advisable) >>>> >>>> Thank you so much! >>>> >>>> Kind regards >>>> >>>> On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> Can you please clarify if you are reading these two topics separately >>>>> or within the same scala or python script in Spark Structured Streaming? >>>>> >>>>> HTH >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira <bruno.ar...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello guys, >>>>>> >>>>>> I've been struggling with this for some days now, without success, so >>>>>> I would highly appreciate any enlightenment. The simplified scenario is >>>>>> the >>>>>> following: >>>>>> >>>>>> - I've got 2 topics in Kafka (it's already like that in >>>>>> production, can't change it) >>>>>> - transactions-created, >>>>>> - transaction-processed >>>>>> - Even though the schema is not exactly the same, they all share >>>>>> a correlation_id, which is their "transaction_id" >>>>>> >>>>>> So, long story short, I've got 2 consumers, one for each topic, and >>>>>> all I wanna do is sink them in a chain order. I'm writing them w/ Spark >>>>>> Structured Streaming, btw >>>>>> >>>>>> So far so good, the caveat here is: >>>>>> >>>>>> - I cannot write a given "*processed" *transaction unless there is >>>>>> an entry of that same transaction with the status "*created*". >>>>>> >>>>>> - There is *no* guarantee that any transactions in the topic >>>>>> "transaction-*processed*" have a match (same transaction_id) in the >>>>>> "transaction-*created*" at the moment the messages are fetched. >>>>>> >>>>>> So the workflow so far is: >>>>>> - Msgs from the "transaction-created" just get synced to postgres, no >>>>>> questions asked >>>>>> >>>>>> - As for the "transaction-processed", it goes as follows: >>>>>> >>>>>> - a) Messages are fetched from the Kafka topic >>>>>> - b) Select the transaction_id of those... >>>>>> - c) Fetch all the rows w/ the corresponding id from a Postgres >>>>>> table AND that have the status "CREATED" >>>>>> - d) Then, a pretty much do a intersection between the two >>>>>> datasets, and sink only on "processed" ones that have with step c >>>>>> - e) Persist the resulting dataset >>>>>> >>>>>> But the rows (from the 'processed') that were not part of the >>>>>> intersection get lost afterwards... >>>>>> >>>>>> So my question is: >>>>>> - Is there ANY way to reprocess/replay them at all WITHOUT restarting >>>>>> the app? >>>>>> - For this scenario, should I fall back to Spark Streaming, instead >>>>>> of Structured Streaming? >>>>>> >>>>>> PS: I was playing around with Spark Streaming (legacy) and managed to >>>>>> commit only the ones in the microbatches that were fully successful >>>>>> (still >>>>>> failed to find a way to "poll" for the uncommitted ones without >>>>>> restarting, >>>>>> though). >>>>>> >>>>>> Thank you very much in advance! >>>>>> >>>>>>