Thanks for the quick reply! I'm not sure I got the idea correctly... but from what I'm underding, wouldn't that actually end the same way? Because, this is the current scenario:
*transactions-processed: * { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" } { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" } { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" } { "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" } *transactions-created:* { "transaction_id": 1, "amount": 1000, "timestamp": "2020-04-04 11:01:00" } { "transaction_id": 2, "amount": 2000, "timestamp": "2020-04-04 12:02:00" } - So, when I fetch ALL messages from both topics, there are still 2x transactions (id: "*3*" and "*4*") which do *not* exist in the topic "transaction-created" yet (and they aren't in Postgres either) - But since they were pulled by "Structured Streaming" already, they'll be kinda marked as "processed" by Spark Structure Streaming checkpoint anyway. And therefore, I can't replay/reprocess them again... Is my understanding correct? Am I missing something here? On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Thanks for the details. > > Can you read these in the same app. For example. This is PySpark but it > serves the purpose. > > Read topic "newtopic" in micro batch and the other topic "md" in another > microbatch > > try: > # process topic --> newtopic > streamingNewtopic = self.spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", > config['MDVariables']['bootstrapServers'],) \ > .option("schema.registry.url", > config['MDVariables']['schemaRegistryURL']) \ > .option("group.id", config['common']['newtopic']) \ > .option("zookeeper.connection.timeout.ms", > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ > .option("rebalance.backoff.ms", > config['MDVariables']['rebalanceBackoffMS']) \ > .option("zookeeper.session.timeout.ms", > config['MDVariables']['zookeeperSessionTimeOutMs']) \ > .option("auto.commit.interval.ms", > config['MDVariables']['autoCommitIntervalMS']) \ > *.option("subscribe", config['MDVariables']['newtopic']) > \* > .option("failOnDataLoss", "false") \ > .option("includeHeaders", "true") \ > .option("startingOffsets", "latest") \ > .load() \ > .select(from_json(col("value").cast("string"), > newtopicSchema).alias("newtopic_value")) > > # construct a streaming dataframe streamingDataFrame that > subscribes to topic config['MDVariables']['topic']) -> md (market data) > streamingDataFrame = self.spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", > config['MDVariables']['bootstrapServers'],) \ > .option("schema.registry.url", > config['MDVariables']['schemaRegistryURL']) \ > .option("group.id", config['common']['appName']) \ > .option("zookeeper.connection.timeout.ms", > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ > .option("rebalance.backoff.ms", > config['MDVariables']['rebalanceBackoffMS']) \ > .option("zookeeper.session.timeout.ms", > config['MDVariables']['zookeeperSessionTimeOutMs']) \ > .option("auto.commit.interval.ms", > config['MDVariables']['autoCommitIntervalMS']) \ > *.option("subscribe", config['MDVariables']['topic']) \* > .option("failOnDataLoss", "false") \ > .option("includeHeaders", "true") \ > .option("startingOffsets", "latest") \ > .load() \ > .select(from_json(col("value").cast("string"), > schema).alias("parsed_value")) > > > streamingNewtopic.printSchema() > > # Now do a writeStream and call the relevant functions to > process dataframes > > newtopicResult = streamingNewtopic.select( \ > col("newtopic_value.uuid").alias("uuid") \ > , col("newtopic_value.timeissued").alias("timeissued") \ > , col("newtopic_value.queue").alias("queue") \ > , col("newtopic_value.status").alias("status")). \ > writeStream. \ > outputMode('append'). \ > option("truncate", "false"). \ > * foreachBatch(sendToControl). \* > trigger(processingTime='2 seconds'). \ > queryName(config['MDVariables']['newtopic']). \ > start() > > result = streamingDataFrame.select( \ > col("parsed_value.rowkey").alias("rowkey") \ > , col("parsed_value.ticker").alias("ticker") \ > , col("parsed_value.timeissued").alias("timeissued") \ > , col("parsed_value.price").alias("price")). \ > writeStream. \ > outputMode('append'). \ > option("truncate", "false"). \ > *foreachBatch(sendToSink). \* > trigger(processingTime='30 seconds'). \ > option('checkpointLocation', checkpoint_path). \ > queryName(config['MDVariables']['topic']). \ > start() > print(result) > > except Exception as e: > print(f"""{e}, quitting""") > sys.exit(1) > > Inside that function say *sendToSink *you can get the df and batchId > > def sendToSink(df, batchId): > if(len(df.take(1))) > 0: > print(f"""md batchId is {batchId}""") > df.show(100,False) > df. persist() > # write to BigQuery batch table > s.writeTableToBQ(df, "append", > config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) > df.unpersist() > print(f"""wrote to DB""") > else: > print("DataFrame md is empty") > > And you have created DF from the other topic newtopic > > def sendToControl(dfnewtopic, batchId): > if(len(dfnewtopic.take(1))) > 0: > ...... > > Now you have two dataframe* df* and *dfnewtopic* in the same session. > Will you be able to join these two dataframes through common key value? > > HTH > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 9 Jul 2021 at 17:41, Bruno Oliveira <bruno.ar...@gmail.com> wrote: > >> Hello! Sure thing! >> >> I'm reading them *separately*, both are apps written with Scala + Spark >> Structured Streaming. >> >> I feel like I missed some details on my original thread (sorry it was >> past 4 AM) and it was getting frustrating >> Please let me try to clarify some points: >> >> *Transactions Created Consumer* >> ----------------------------------- >> | Kafka trx-created-topic | <--- (Scala + SparkStructured Streaming) >> ConsumerApp ---> Sinks to ---> Postgres DB Table (Transactions) >> ----------------------------------- >> >> *Transactions Processed Consumer* >> ------------------------------------- >> | Kafka trx-processed-topic | <--- 1) (Scala + SparkStructured >> Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a") >> ------------------------------------- 2) Selects the Ids >> ------------------------------------- >> | Postgres / Trx table |. <--- 3) Fetches the rows w/ the >> matching ids that have status 'created (let's call it "b") >> ------------------------------------- 4) Performs an >> intersection between "a" and "b" resulting in a "b_that_needs_sinking" (but >> now there's some "b_leftovers" that were out of the intersection) >> 5) Sinks >> "b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as >> unprocessed (not persisted) >> 6) However, those >> "b_leftovers" would, ultimately, be processed at some point (even if it >> takes like 1-3 days) - when their corresponding transaction_id are >> pushed to the >> "trx-created-topic" Kafka topic, and are then processed by that first >> consumer >> >> So, what I'm trying to accomplish is find a way to reprocess those >> "b_leftovers" *without *having to restart the app >> Does that make sense? >> >> PS: It doesn't necessarily have to be real streaming, if micro-batching >> (legacy Spark Streaming) would allow such a thing, it would technically >> work (although I keep hearing it's not advisable) >> >> Thank you so much! >> >> Kind regards >> >> On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> Can you please clarify if you are reading these two topics separately or >>> within the same scala or python script in Spark Structured Streaming? >>> >>> HTH >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira <bruno.ar...@gmail.com> >>> wrote: >>> >>>> Hello guys, >>>> >>>> I've been struggling with this for some days now, without success, so I >>>> would highly appreciate any enlightenment. The simplified scenario is the >>>> following: >>>> >>>> - I've got 2 topics in Kafka (it's already like that in production, >>>> can't change it) >>>> - transactions-created, >>>> - transaction-processed >>>> - Even though the schema is not exactly the same, they all share a >>>> correlation_id, which is their "transaction_id" >>>> >>>> So, long story short, I've got 2 consumers, one for each topic, and all >>>> I wanna do is sink them in a chain order. I'm writing them w/ Spark >>>> Structured Streaming, btw >>>> >>>> So far so good, the caveat here is: >>>> >>>> - I cannot write a given "*processed" *transaction unless there is an >>>> entry of that same transaction with the status "*created*". >>>> >>>> - There is *no* guarantee that any transactions in the topic >>>> "transaction-*processed*" have a match (same transaction_id) in the >>>> "transaction-*created*" at the moment the messages are fetched. >>>> >>>> So the workflow so far is: >>>> - Msgs from the "transaction-created" just get synced to postgres, no >>>> questions asked >>>> >>>> - As for the "transaction-processed", it goes as follows: >>>> >>>> - a) Messages are fetched from the Kafka topic >>>> - b) Select the transaction_id of those... >>>> - c) Fetch all the rows w/ the corresponding id from a Postgres >>>> table AND that have the status "CREATED" >>>> - d) Then, a pretty much do a intersection between the two >>>> datasets, and sink only on "processed" ones that have with step c >>>> - e) Persist the resulting dataset >>>> >>>> But the rows (from the 'processed') that were not part of the >>>> intersection get lost afterwards... >>>> >>>> So my question is: >>>> - Is there ANY way to reprocess/replay them at all WITHOUT restarting >>>> the app? >>>> - For this scenario, should I fall back to Spark Streaming, instead of >>>> Structured Streaming? >>>> >>>> PS: I was playing around with Spark Streaming (legacy) and managed to >>>> commit only the ones in the microbatches that were fully successful (still >>>> failed to find a way to "poll" for the uncommitted ones without restarting, >>>> though). >>>> >>>> Thank you very much in advance! >>>> >>>>