I mean... I guess? But I don't really have Airflow here, and I didn't really wanted to fall back to a "batch"-kinda approach with Airflow
I'd rather use a Dead Letter Queue approach instead (like I mentioned another topic for the failed ones, which is later consumed and pumps the messages back to the original topic), or something with Spark+Delta Lake instead... I was just hoping I could somewhat just retry/replay these "orphaned" transactions somewhat easier... *Question) *Those features of "Stateful Streaming" or "Continuous Processing" mode wouldn't help solve my case, would they? On Fri, Jul 9, 2021 at 8:19 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Well this is a matter of using journal entries. > > What you can do is that those "orphaned" transactions that you cannot pair > through transaction_id can be written to a journal table in your Postgres > DB. Then you can pair them with the entries in the relevant Postgres table. > If the essence is not time critical this can be done through a scheduling > job every x minutes through airflow or something similar on the database > alone. > > HTH > > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 9 Jul 2021 at 23:53, Bruno Oliveira <bruno.ar...@gmail.com> wrote: > >> That is exactly the case, Sebastian! >> >> - In practise, that "created means "*authorized*", but I still cannot >> deduct anything from the customer balance >> - the "processed" means I can safely deduct the transaction_amount from >> the customer balance, >> - and the "refunded" means I must give the transaction amount back to the >> customer balance >> >> So, technically, we cannot process something that is not "AUTHORIZED" >> (created) yet, nor can we process a refund for a transaction that has NOT >> been PROCESSED yet. >> >> >> *You have an authorisation, then the actual transaction and maybe a >>> refund some time in the future. You want to proceed with a transaction only >>> if you've seen the auth but in an eventually consistent system this might >>> not always happen.* >> >> >> That's absolutely the case! So, yes, That's correct. >> >> *You are asking in the case of receiving the transaction before the auth >>> how to retry later? * >> >> >> Yeah! I'm struggling for days on how to solve with Spark Structured >> Streaming... >> >> *Right now you are discarding those transactions that didn't match so you >>> instead would need to persist them somewhere and either reinject them into >>> the job that does lookup (say after x minutes) * >> >> >> >> *Right now, the best I could think of is: * >> >> - Say, I'm reading the messages w/ transaction_id [1, 2, 3] from >> Kafka (topic "transactions-processed") >> - Then I'm querying the database for these IDs that have the status >> "CREATED" (or "AUTHORIZED" to be more accurate), and it returns the >> transactions for IDs [1, 2] >> - So, while it'll work for the ones with ID [1. 2] , I would have to >> put that transaction_id 3 in another topic, say, " >> *transaction-processed-retry*" >> - And write yet another consumer, to fetch the messages from that >> "*transaction-processed-retry" >> *and put them back to the original topic (transactions-processed) >> - And do something similar for the transactions-refunded >> >> *Q1) *I think this approach may work, but I can't stop thinking I'm >> overengineering this, and was wondering if there isn't a better approach... >> ? >> >> *Is this what you are looking for?* >> >> >> Yes, that's exactly it. >> >> >> *Q2)* I know that, under the hood, Structured Streaming is actually >> using the micro-batch engine, >> if I switched to *Continuous Processing*, would it make any >> difference? Would it allow me any "retry" mechanism out of the box? >> >> *Q3)* I stumbled upon a *Stateful Streaming* ( >> https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming) >> , but I have never ever used it before, >> would that actually do something for my case (retrying/replaying >> a given message) ? >> >> >> Thank you very VERY in advance! >> Best regards >> >> >> On Fri, Jul 9, 2021 at 6:36 PM Sebastian Piu <sebastian....@gmail.com> >> wrote: >> >>> So in payment systems you have something similar I think >>> >>> You have an authorisation, then the actual transaction and maybe a >>> refund some time in the future. You want to proceed with a transaction only >>> if you've seen the auth but in an eventually consistent system this might >>> not always happen. >>> >>> You are asking in the case of receiving the transaction before the auth >>> how to retry later? >>> >>> Right now you are discarding those transactions that didn't match so you >>> instead would need to persist them somewhere and either reinject them into >>> the job that does lookup (say after x minutes) >>> >>> Is this what you are looking for? >>> >>> On Fri, 9 Jul 2021, 9:44 pm Bruno Oliveira, <bruno.ar...@gmail.com> >>> wrote: >>> >>>> I'm terribly sorry, Mich. That was my mistake. >>>> The timestamps are not the same (I copy&pasted without realizing that, >>>> I'm really sorry for the confusion) >>>> >>>> Please assume NONE of the following transactions are in the database yet >>>> >>>> *transactions-created:* >>>> { "transaction_id": 1, "amount": 1000, "timestamp": "2020-04-04 >>>> 11:01:00" } >>>> { "transaction_id": 2, "amount": 2000, "timestamp": "2020-04-04 >>>> 08:02:00" } >>>> >>>> *transactions-processed: * >>>> { "transaction_id": 1, "timestamp": "2020-04-04 11:03:00" } // so >>>> it's processed 2 minutes after it was created >>>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" } // so >>>> it's processed 4 hours after it was created >>>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" } // >>>> cannot be persisted into the DB yet, because this "transaction_id 3" with >>>> the status "CREATED" does NOT exist in the DB >>>> >>>> >>>> *(...) Transactions-created are created at the same time (the same >>>>> timestamp) but you have NOT received them and they don't yet exist in your >>>>> DB (...)* >>>> >>>> - Not at the same timestamp, that was my mistake. >>>> - Imagine two transactions with the same ID (neither of them are in any >>>> Kafka topic yet), >>>> >>>> - One with the status CREATED, and another with the status >>>> PROCESSED, >>>> - The one with the status PROCESSED will ALWAYS have a >>>> higher/greater timestamp than the one with the status CREATED >>>> - Now for whatever reason, this happens: >>>> - Step a) some producer *fails* to push the *created* one to the >>>> topic *transactions-created, it will RETRY, and will eventually >>>> succeed, but that can take minutes, or hours* >>>> - Step b) however, the producer *succeeds* in pushing the* >>>> 'processed' *one to the topic *transactions-processed * >>>> >>>> >>>> *(...) because presumably your relational database is too slow to >>>>> ingest them? (...)* >>>> >>>> >>>> - it's not like the DB was slow, it was because the message for >>>> transaction_id 3 didn't arrive at the *topic-created *yet, due to some >>>> error/failure in Step A, for example >>>> >>>> >>>> * you do a query in Postgres for say transaction_id 3 but they don't >>>>> exist yet? When are they expected to arrive?* >>>> >>>> >>>> - That's correct. It could take minutes, maybe hours. But it is >>>> guaranteed that at some point, in the future, they will arrive. I just have >>>> to keep trying until it works, this transaction_id 3 with the status >>>> CREATED arrives at the database >>>> >>>> >>>> Huge apologies for the confusion... Is it a bit more clear now? >>>> >>>> *PS:* This is a simplified scenario, in practise, there is yet another >>>> topic for "transactions-refunded". But which cannot be sinked to the DB, >>>> unless the same transaction_id with the status "PROCESSED" is there. (but >>>> again, there can only be a transaction_id PROCESSED, if the same >>>> transaction_id with CREATED exists in the DB) >>>> >>>> >>>> On Fri, Jul 9, 2021 at 4:51 PM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> One second >>>>> >>>>> The topic called transactions_processed is streaming through Spark. >>>>> Transactions-created are created at the same time (the same timestamp) but >>>>> you have NOT received them and they don't yet exist in your DB, >>>>> because presumably your relational database is too slow to ingest them? >>>>> you >>>>> do a query in Postgres for say transaction_id 3 but they don't exist yet? >>>>> When are they expected to arrive? >>>>> >>>>> >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, 9 Jul 2021 at 19:12, Bruno Oliveira <bruno.ar...@gmail.com> >>>>> wrote: >>>>> >>>>>> Thanks for the quick reply! >>>>>> >>>>>> I'm not sure I got the idea correctly... but from what I'm underding, >>>>>> wouldn't that actually end the same way? >>>>>> Because, this is the current scenario: >>>>>> >>>>>> *transactions-processed: * >>>>>> { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" } >>>>>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" } >>>>>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" } >>>>>> { "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" } >>>>>> >>>>>> *transactions-created:* >>>>>> { "transaction_id": 1, "amount": 1000, "timestamp": "2020-04-04 >>>>>> 11:01:00" } >>>>>> { "transaction_id": 2, "amount": 2000, "timestamp": "2020-04-04 >>>>>> 12:02:00" } >>>>>> >>>>>> - So, when I fetch ALL messages from both topics, there are still 2x >>>>>> transactions (id: "*3*" and "*4*") which do *not* exist in the topic >>>>>> "transaction-created" yet (and they aren't in Postgres either) >>>>>> - But since they were pulled by "Structured Streaming" already, >>>>>> they'll be kinda marked as "processed" by Spark Structure Streaming >>>>>> checkpoint anyway. >>>>>> >>>>>> And therefore, I can't replay/reprocess them again... >>>>>> >>>>>> Is my understanding correct? Am I missing something here? >>>>>> >>>>>> On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Thanks for the details. >>>>>>> >>>>>>> Can you read these in the same app. For example. This is PySpark but >>>>>>> it serves the purpose. >>>>>>> >>>>>>> Read topic "newtopic" in micro batch and the other topic "md" in >>>>>>> another microbatch >>>>>>> >>>>>>> try: >>>>>>> # process topic --> newtopic >>>>>>> streamingNewtopic = self.spark \ >>>>>>> .readStream \ >>>>>>> .format("kafka") \ >>>>>>> .option("kafka.bootstrap.servers", >>>>>>> config['MDVariables']['bootstrapServers'],) \ >>>>>>> .option("schema.registry.url", >>>>>>> config['MDVariables']['schemaRegistryURL']) \ >>>>>>> .option("group.id", config['common']['newtopic']) \ >>>>>>> .option("zookeeper.connection.timeout.ms", >>>>>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >>>>>>> .option("rebalance.backoff.ms", >>>>>>> config['MDVariables']['rebalanceBackoffMS']) \ >>>>>>> .option("zookeeper.session.timeout.ms", >>>>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \ >>>>>>> .option("auto.commit.interval.ms", >>>>>>> config['MDVariables']['autoCommitIntervalMS']) \ >>>>>>> *.option("subscribe", >>>>>>> config['MDVariables']['newtopic']) \* >>>>>>> .option("failOnDataLoss", "false") \ >>>>>>> .option("includeHeaders", "true") \ >>>>>>> .option("startingOffsets", "latest") \ >>>>>>> .load() \ >>>>>>> .select(from_json(col("value").cast("string"), >>>>>>> newtopicSchema).alias("newtopic_value")) >>>>>>> >>>>>>> # construct a streaming dataframe streamingDataFrame >>>>>>> that subscribes to topic config['MDVariables']['topic']) -> md (market >>>>>>> data) >>>>>>> streamingDataFrame = self.spark \ >>>>>>> .readStream \ >>>>>>> .format("kafka") \ >>>>>>> .option("kafka.bootstrap.servers", >>>>>>> config['MDVariables']['bootstrapServers'],) \ >>>>>>> .option("schema.registry.url", >>>>>>> config['MDVariables']['schemaRegistryURL']) \ >>>>>>> .option("group.id", config['common']['appName']) \ >>>>>>> .option("zookeeper.connection.timeout.ms", >>>>>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >>>>>>> .option("rebalance.backoff.ms", >>>>>>> config['MDVariables']['rebalanceBackoffMS']) \ >>>>>>> .option("zookeeper.session.timeout.ms", >>>>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \ >>>>>>> .option("auto.commit.interval.ms", >>>>>>> config['MDVariables']['autoCommitIntervalMS']) \ >>>>>>> *.option("subscribe", >>>>>>> config['MDVariables']['topic']) \* >>>>>>> .option("failOnDataLoss", "false") \ >>>>>>> .option("includeHeaders", "true") \ >>>>>>> .option("startingOffsets", "latest") \ >>>>>>> .load() \ >>>>>>> .select(from_json(col("value").cast("string"), >>>>>>> schema).alias("parsed_value")) >>>>>>> >>>>>>> >>>>>>> streamingNewtopic.printSchema() >>>>>>> >>>>>>> # Now do a writeStream and call the relevant functions >>>>>>> to process dataframes >>>>>>> >>>>>>> newtopicResult = streamingNewtopic.select( \ >>>>>>> col("newtopic_value.uuid").alias("uuid") \ >>>>>>> , >>>>>>> col("newtopic_value.timeissued").alias("timeissued") \ >>>>>>> , col("newtopic_value.queue").alias("queue") \ >>>>>>> , col("newtopic_value.status").alias("status")). \ >>>>>>> writeStream. \ >>>>>>> outputMode('append'). \ >>>>>>> option("truncate", "false"). \ >>>>>>> * foreachBatch(sendToControl). \* >>>>>>> trigger(processingTime='2 seconds'). \ >>>>>>> queryName(config['MDVariables']['newtopic']). \ >>>>>>> start() >>>>>>> >>>>>>> result = streamingDataFrame.select( \ >>>>>>> col("parsed_value.rowkey").alias("rowkey") \ >>>>>>> , col("parsed_value.ticker").alias("ticker") \ >>>>>>> , >>>>>>> col("parsed_value.timeissued").alias("timeissued") \ >>>>>>> , col("parsed_value.price").alias("price")). \ >>>>>>> writeStream. \ >>>>>>> outputMode('append'). \ >>>>>>> option("truncate", "false"). \ >>>>>>> *foreachBatch(sendToSink). \* >>>>>>> trigger(processingTime='30 seconds'). \ >>>>>>> option('checkpointLocation', checkpoint_path). \ >>>>>>> queryName(config['MDVariables']['topic']). \ >>>>>>> start() >>>>>>> print(result) >>>>>>> >>>>>>> except Exception as e: >>>>>>> print(f"""{e}, quitting""") >>>>>>> sys.exit(1) >>>>>>> >>>>>>> Inside that function say *sendToSink *you can get the df and batchId >>>>>>> >>>>>>> def sendToSink(df, batchId): >>>>>>> if(len(df.take(1))) > 0: >>>>>>> print(f"""md batchId is {batchId}""") >>>>>>> df.show(100,False) >>>>>>> df. persist() >>>>>>> # write to BigQuery batch table >>>>>>> s.writeTableToBQ(df, "append", >>>>>>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) >>>>>>> df.unpersist() >>>>>>> print(f"""wrote to DB""") >>>>>>> else: >>>>>>> print("DataFrame md is empty") >>>>>>> >>>>>>> And you have created DF from the other topic newtopic >>>>>>> >>>>>>> def sendToControl(dfnewtopic, batchId): >>>>>>> if(len(dfnewtopic.take(1))) > 0: >>>>>>> ...... >>>>>>> >>>>>>> Now you have two dataframe* df* and *dfnewtopic* in the same >>>>>>> session. Will you be able to join these two dataframes through common >>>>>>> key >>>>>>> value? >>>>>>> >>>>>>> HTH >>>>>>> >>>>>>> >>>>>>> >>>>>>> view my Linkedin profile >>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, 9 Jul 2021 at 17:41, Bruno Oliveira <bruno.ar...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello! Sure thing! >>>>>>>> >>>>>>>> I'm reading them *separately*, both are apps written with Scala + >>>>>>>> Spark Structured Streaming. >>>>>>>> >>>>>>>> I feel like I missed some details on my original thread (sorry it >>>>>>>> was past 4 AM) and it was getting frustrating >>>>>>>> Please let me try to clarify some points: >>>>>>>> >>>>>>>> *Transactions Created Consumer* >>>>>>>> ----------------------------------- >>>>>>>> | Kafka trx-created-topic | <--- (Scala + SparkStructured >>>>>>>> Streaming) ConsumerApp ---> Sinks to ---> Postgres DB Table >>>>>>>> (Transactions) >>>>>>>> ----------------------------------- >>>>>>>> >>>>>>>> *Transactions Processed Consumer* >>>>>>>> ------------------------------------- >>>>>>>> | Kafka trx-processed-topic | <--- 1) (Scala + SparkStructured >>>>>>>> Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a") >>>>>>>> ------------------------------------- 2) Selects the Ids >>>>>>>> ------------------------------------- >>>>>>>> | Postgres / Trx table |. <--- 3) Fetches the rows w/ the >>>>>>>> matching ids that have status 'created (let's call it "b") >>>>>>>> ------------------------------------- 4) Performs an >>>>>>>> intersection between "a" and "b" resulting in a "b_that_needs_sinking" >>>>>>>> (but >>>>>>>> now there's some "b_leftovers" that were out of the intersection) >>>>>>>> 5) Sinks >>>>>>>> "b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as >>>>>>>> unprocessed (not persisted) >>>>>>>> 6) However, >>>>>>>> those "b_leftovers" would, ultimately, be processed at some point >>>>>>>> (even if >>>>>>>> it takes like 1-3 days) - when their corresponding transaction_id are >>>>>>>> pushed to >>>>>>>> the "trx-created-topic" Kafka topic, and are then processed by that >>>>>>>> first >>>>>>>> consumer >>>>>>>> >>>>>>>> So, what I'm trying to accomplish is find a way to reprocess those >>>>>>>> "b_leftovers" *without *having to restart the app >>>>>>>> Does that make sense? >>>>>>>> >>>>>>>> PS: It doesn't necessarily have to be real streaming, if >>>>>>>> micro-batching (legacy Spark Streaming) would allow such a thing, it >>>>>>>> would >>>>>>>> technically work (although I keep hearing it's not advisable) >>>>>>>> >>>>>>>> Thank you so much! >>>>>>>> >>>>>>>> Kind regards >>>>>>>> >>>>>>>> On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh < >>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Can you please clarify if you are reading these two topics >>>>>>>>> separately or within the same scala or python script in Spark >>>>>>>>> Structured >>>>>>>>> Streaming? >>>>>>>>> >>>>>>>>> HTH >>>>>>>>> >>>>>>>>> >>>>>>>>> view my Linkedin profile >>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>>>> for any loss, damage or destruction of data or any other property >>>>>>>>> which may >>>>>>>>> arise from relying on this email's technical content is explicitly >>>>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>>>> damages >>>>>>>>> arising from such loss, damage or destruction. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira <bruno.ar...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello guys, >>>>>>>>>> >>>>>>>>>> I've been struggling with this for some days now, without >>>>>>>>>> success, so I would highly appreciate any enlightenment. The >>>>>>>>>> simplified >>>>>>>>>> scenario is the following: >>>>>>>>>> >>>>>>>>>> - I've got 2 topics in Kafka (it's already like that in >>>>>>>>>> production, can't change it) >>>>>>>>>> - transactions-created, >>>>>>>>>> - transaction-processed >>>>>>>>>> - Even though the schema is not exactly the same, they all >>>>>>>>>> share a correlation_id, which is their "transaction_id" >>>>>>>>>> >>>>>>>>>> So, long story short, I've got 2 consumers, one for each topic, >>>>>>>>>> and all I wanna do is sink them in a chain order. I'm writing them >>>>>>>>>> w/ Spark >>>>>>>>>> Structured Streaming, btw >>>>>>>>>> >>>>>>>>>> So far so good, the caveat here is: >>>>>>>>>> >>>>>>>>>> - I cannot write a given "*processed" *transaction unless there >>>>>>>>>> is an entry of that same transaction with the status "*created*". >>>>>>>>>> >>>>>>>>>> - There is *no* guarantee that any transactions in the topic >>>>>>>>>> "transaction-*processed*" have a match (same transaction_id) in >>>>>>>>>> the "transaction-*created*" at the moment the messages are >>>>>>>>>> fetched. >>>>>>>>>> >>>>>>>>>> So the workflow so far is: >>>>>>>>>> - Msgs from the "transaction-created" just get synced to >>>>>>>>>> postgres, no questions asked >>>>>>>>>> >>>>>>>>>> - As for the "transaction-processed", it goes as follows: >>>>>>>>>> >>>>>>>>>> - a) Messages are fetched from the Kafka topic >>>>>>>>>> - b) Select the transaction_id of those... >>>>>>>>>> - c) Fetch all the rows w/ the corresponding id from a >>>>>>>>>> Postgres table AND that have the status "CREATED" >>>>>>>>>> - d) Then, a pretty much do a intersection between the two >>>>>>>>>> datasets, and sink only on "processed" ones that have with step c >>>>>>>>>> - e) Persist the resulting dataset >>>>>>>>>> >>>>>>>>>> But the rows (from the 'processed') that were not part of the >>>>>>>>>> intersection get lost afterwards... >>>>>>>>>> >>>>>>>>>> So my question is: >>>>>>>>>> - Is there ANY way to reprocess/replay them at all WITHOUT >>>>>>>>>> restarting the app? >>>>>>>>>> - For this scenario, should I fall back to Spark Streaming, >>>>>>>>>> instead of Structured Streaming? >>>>>>>>>> >>>>>>>>>> PS: I was playing around with Spark Streaming (legacy) and >>>>>>>>>> managed to commit only the ones in the microbatches that were fully >>>>>>>>>> successful (still failed to find a way to "poll" for the uncommitted >>>>>>>>>> ones >>>>>>>>>> without restarting, though). >>>>>>>>>> >>>>>>>>>> Thank you very much in advance! >>>>>>>>>> >>>>>>>>>>