Adding to what Mitch said, 1. Are you trying to send statements of all orders to all users? Or the latest order only?
2. Sending email is not a good use of spark. instead, I suggest to use a notification service or function. Spark should write to a queue (kafka, sqs...pick your choice here). Best regards Ayan On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Well OK in a nutshell you want the result set for every user prepared and > email to that user right. > > This is a form of ETL where those result sets need to be posted somewhere. > Say you create a table based on the result set prepared for each user. You > may have many raw target tables at the end of the first ETL. How does this > differ from using forEach? Performance wise forEach may not be optimal. > > Can you take the sample tables and try your method? > > HTH > > Mich Talebzadeh, > Lead Solutions Architect/Engineering Lead > Palantir Technologies Limited > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Wed, 26 Apr 2023 at 04:10, Marco Costantini < > marco.costant...@rocketfncl.com> wrote: > >> Hi Mich, >> First, thank you for that. Great effort put into helping. >> >> Second, I don't think this tackles the technical challenge here. I >> understand the windowing as it serves those ranks you created, but I don't >> see how the ranks contribute to the solution. >> Third, the core of the challenge is about performing this kind of >> 'statement' but for all users. In this example we target Mich, but that >> reduces the complexity by a lot! In fact, a simple join and filter would >> solve that one. >> >> Any thoughts on that? For me, the foreach is desirable because I can have >> the workers chain other actions to each iteration (send email, send HTTP >> request, etc). >> >> Thanks Mich, >> Marco. >> >> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> Hi Marco, >>> >>> First thoughts. >>> >>> foreach() is an action operation that is to iterate/loop over each >>> element in the dataset, meaning cursor based. That is different from >>> operating over the dataset as a set which is far more efficient. >>> >>> So in your case as I understand it correctly, you want to get order for >>> each user (say Mich), convert the result set to json and send it to Mich >>> via email >>> >>> Let us try this based on sample data >>> >>> Put your csv files into HDFS directory >>> >>> hdfs dfs -put users.csv /data/stg/test >>> hdfs dfs -put orders.csv /data/stg/test >>> >>> Then create dataframes from csv files, create temp views and do a join >>> on result sets with some slicing and dicing on orders table >>> >>> #! /usr/bin/env python3 >>> from __future__ import print_function >>> import sys >>> import findspark >>> findspark.init() >>> from pyspark.sql import SparkSession >>> from pyspark import SparkContext >>> from pyspark.sql import SQLContext, HiveContext >>> from pyspark.sql.window import Window >>> >>> def spark_session(appName): >>> return SparkSession.builder \ >>> .appName(appName) \ >>> .enableHiveSupport() \ >>> .getOrCreate() >>> >>> def main(): >>> appName = "ORDERS" >>> spark =spark_session(appName) >>> # get the sample >>> users_file="hdfs://rhes75:9000/data/stg/test/users.csv" >>> orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv" >>> users_df = >>> spark.read.format("com.databricks.spark.csv").option("inferSchema", >>> "true").option("header", "true").load(users_file) >>> users_df.printSchema() >>> """ >>> root >>> |-- id: integer (nullable = true) >>> |-- name: string (nullable = true) >>> """ >>> >>> print(f"""\n Reading from {users_file}\n""") >>> users_df.show(5,False) >>> orders_df = >>> spark.read.format("com.databricks.spark.csv").option("inferSchema", >>> "true").option("header", "true").load(orders_file) >>> orders_df.printSchema() >>> """ >>> root >>> |-- id: integer (nullable = true) >>> |-- description: string (nullable = true) >>> |-- amount: double (nullable = true) >>> |-- user_id: integer (nullable = true) >>> """ >>> print(f"""\n Reading from {orders_file}\n""") >>> orders_df.show(50,False) >>> users_df.createOrReplaceTempView("users") >>> orders_df.createOrReplaceTempView("orders") >>> # Create a list of orders for each user >>> print(f"""\n Doing a join on two temp views\n""") >>> >>> sqltext = """ >>> SELECT u.name, t.order_id, t.description, t.amount, t.maxorders >>> FROM >>> ( >>> SELECT >>> user_id AS user_id >>> , id as order_id >>> , description as description >>> , amount AS amount >>> , DENSE_RANK() OVER (PARTITION by user_id ORDER BY amount) AS >>> RANK >>> , MAX(amount) OVER (PARTITION by user_id ORDER BY id) AS >>> maxorders >>> FROM orders >>> ) t >>> INNER JOIN users u ON t.user_id = u.id >>> AND u.name = 'Mich' >>> ORDER BY t.order_id >>> """ >>> spark.sql(sqltext).show(50) >>> if __name__ == '__main__': >>> main() >>> >>> Final outcome displaying orders for user Mich >>> >>> Doing a join on two temp views >>> >>> Doing a join on two temp views >>> >>> +----+--------+-----------------+------+---------+ >>> |name|order_id| description|amount|maxorders| >>> +----+--------+-----------------+------+---------+ >>> |Mich| 50001| Mich's 1st order|101.11| 101.11| >>> |Mich| 50002| Mich's 2nd order|102.11| 102.11| >>> |Mich| 50003| Mich's 3rd order|103.11| 103.11| >>> |Mich| 50004| Mich's 4th order|104.11| 104.11| >>> |Mich| 50005| Mich's 5th order|105.11| 105.11| >>> |Mich| 50006| Mich's 6th order|106.11| 106.11| >>> |Mich| 50007| Mich's 7th order|107.11| 107.11| >>> |Mich| 50008| Mich's 8th order|108.11| 108.11| >>> |Mich| 50009| Mich's 9th order|109.11| 109.11| >>> |Mich| 50010|Mich's 10th order|210.11| 210.11| >>> +----+--------+-----------------+------+---------+ >>> >>> You can start on this. Happy coding >>> >>> Mich Talebzadeh, >>> Lead Solutions Architect/Engineering Lead >>> Palantir Technologies Limited >>> London >>> United Kingdom >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Tue, 25 Apr 2023 at 18:50, Marco Costantini < >>> marco.costant...@rocketfncl.com> wrote: >>> >>>> Thanks Mich, >>>> >>>> Great idea. I have done it. Those files are attached. I'm interested to >>>> know your thoughts. Let's imagine this same structure, but with huge >>>> amounts of data as well. >>>> >>>> Please and thank you, >>>> Marco. >>>> >>>> On Tue, Apr 25, 2023 at 12:12 PM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> Hi Marco, >>>>> >>>>> Let us start simple, >>>>> >>>>> Provide a csv file of 5 rows for the users table. Each row has a >>>>> unique user_id and one or two other columns like fictitious email etc. >>>>> >>>>> Also for each user_id, provide 10 rows of orders table, meaning that >>>>> orders table has 5 x 10 rows for each user_id. >>>>> >>>>> both as comma separated csv file >>>>> >>>>> HTH >>>>> >>>>> Mich Talebzadeh, >>>>> Lead Solutions Architect/Engineering Lead >>>>> Palantir Technologies Limited >>>>> London >>>>> United Kingdom >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, 25 Apr 2023 at 14:07, Marco Costantini < >>>>> marco.costant...@rocketfncl.com> wrote: >>>>> >>>>>> Thanks Mich, >>>>>> I have not but I will certainly read up on this today. >>>>>> >>>>>> To your point that all of the essential data is in the 'orders' >>>>>> table; I agree! That distills the problem nicely. Yet, I still have some >>>>>> questions on which someone may be able to shed some light. >>>>>> >>>>>> 1) If my 'orders' table is very large, and will need to be aggregated >>>>>> by 'user_id', how will Spark intelligently optimize on that constraint >>>>>> (only read data for relevent 'user_id's). Is that something I have to >>>>>> instruct Spark to do? >>>>>> >>>>>> 2) Without #1, even with windowing, am I asking each partition to >>>>>> search too much? >>>>>> >>>>>> Please, if you have any links to documentation I can read on *how* >>>>>> Spark works under the hood for these operations, I would appreciate it if >>>>>> you give them. Spark has become a pillar on my team and knowing it in >>>>>> more >>>>>> detail is warranted. >>>>>> >>>>>> Slightly pivoting the subject here; I have tried something. It was a >>>>>> suggestion by an AI chat bot and it seemed reasonable. In my main Spark >>>>>> script I now have the line: >>>>>> >>>>>> ``` >>>>>> grouped_orders_df = >>>>>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id', >>>>>> 'timestamp', 'total', 'description'))).alias('orders')) >>>>>> ``` >>>>>> (json is ultimately needed) >>>>>> >>>>>> This actually achieves my goal by putting all of the 'orders' in a >>>>>> single Array column. Now my worry is, will this column become too large >>>>>> if >>>>>> there are a great many orders. Is there a limit? I have search for >>>>>> documentation on such a limit but could not find any. >>>>>> >>>>>> I truly appreciate your help Mich and team, >>>>>> Marco. >>>>>> >>>>>> >>>>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Have you thought of using windowing function >>>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to >>>>>>> achieve this? >>>>>>> >>>>>>> Effectively all your information is in the orders table. >>>>>>> >>>>>>> HTH >>>>>>> >>>>>>> Mich Talebzadeh, >>>>>>> Lead Solutions Architect/Engineering Lead >>>>>>> Palantir Technologies Limited >>>>>>> London >>>>>>> United Kingdom >>>>>>> >>>>>>> >>>>>>> view my Linkedin profile >>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>> >>>>>>> >>>>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini < >>>>>>> marco.costant...@rocketfncl.com> wrote: >>>>>>> >>>>>>>> I have two tables: {users, orders}. In this example, let's say that >>>>>>>> for each 1 User in the users table, there are 100000 Orders in the >>>>>>>> orders >>>>>>>> table. >>>>>>>> >>>>>>>> I have to use pyspark to generate a statement of Orders for each >>>>>>>> User. So, a single user will need his/her own list of Orders. >>>>>>>> Additionally, >>>>>>>> I need to send this statement to the real-world user via email (for >>>>>>>> example). >>>>>>>> >>>>>>>> My first intuition was to apply a DataFrame.foreach() on the users >>>>>>>> DataFrame. This way, I can rely on the spark workers to handle the >>>>>>>> email >>>>>>>> sending individually. However, I now do not know the best way to get >>>>>>>> each >>>>>>>> User's Orders. >>>>>>>> >>>>>>>> I will soon try the following (pseudo-code): >>>>>>>> >>>>>>>> ``` >>>>>>>> users_df = <my entire users DataFrame> >>>>>>>> orders_df = <my entire orders DataFrame> >>>>>>>> >>>>>>>> #this is poorly named for max understandability in this context >>>>>>>> def foreach_function(row): >>>>>>>> user_id = row.user_id >>>>>>>> user_orders_df = orders_df.select(f'user_id = {user_id}') >>>>>>>> >>>>>>>> #here, I'd get any User info from 'row' >>>>>>>> #then, I'd convert all 'user_orders' to JSON >>>>>>>> #then, I'd prepare the email and send it >>>>>>>> >>>>>>>> users_df.foreach(foreach_function) >>>>>>>> ``` >>>>>>>> >>>>>>>> It is my understanding that if I do my user-specific work in the >>>>>>>> foreach function, I will capitalize on Spark's scalability when doing >>>>>>>> that >>>>>>>> work. However, I am worried of two things: >>>>>>>> >>>>>>>> If I take all Orders up front... >>>>>>>> >>>>>>>> Will that work? >>>>>>>> Will I be taking too much? Will I be taking Orders on partitions >>>>>>>> who won't handle them (different User). >>>>>>>> >>>>>>>> If I create the orders_df (filtered) within the foreach function... >>>>>>>> >>>>>>>> Will it work? >>>>>>>> Will that be too much IO to DB? >>>>>>>> >>>>>>>> The question ultimately is: How can I achieve this goal efficiently? >>>>>>>> >>>>>>>> I have not yet tried anything here. I am doing so as we speak, but >>>>>>>> am suffering from choice-paralysis. >>>>>>>> >>>>>>>> Please and thank you. >>>>>>>> >>>>>>> -- Best Regards, Ayan Guha