RE: Spark Kubernetes Operator

2023-04-26 Thread Aldo Culquicondor
We are welcoming contributors, as announced in the Kubernetes WG Batch
https://docs.google.com/document/d/1XOeUN-K0aKmJJNq7H07r74n-mGgSFyiEDQ3ecwsGhec/edit#bookmark=id.gfgjt0nmbgjl

If you are interested, you can find us in slack.k8s.io #wg-batch or
ping @mwielgus on github/slack.

Thanks

On 2023/04/14 16:41:36 Yuval Itzchakov wrote:
> Hi,
>
> ATM I see the most used option for a Spark operator is the one provided by
> Google: https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
>
> Unfortunately, it doesn't seem actively maintained. Are there any plans to
> support an official Apache Spark community driven operator?
>


Re: What is the best way to organize a join within a foreach?

2023-04-26 Thread Mich Talebzadeh
Again one try is worth many opinions. Try it and gather matrix from spark
UI and see how it performs.

On Wed, 26 Apr 2023 at 14:57, Marco Costantini <
marco.costant...@rocketfncl.com> wrote:

> Thanks team,
> Email was just an example. The point was to illustrate that some actions
> could be chained using Spark's foreach. In reality, this is an S3 write and
> a Kafka message production, which I think is quite reasonable for spark to
> do.
>
> To answer Ayan's first question. Yes, all a users orders, prepared for
> each and every user.
>
> Other than the remarks that email transmission is unwise (which I've now
> reminded is irrelevant) I am not seeing an alternative to using Spark's
> foreach. Unless, your proposal is for the Spark job to target 1 user, and
> just run the job 1000's of times taking the user_id as input. That doesn't
> sound attractive.
>
> Also, while we say that foreach is not optimal, I cannot find any evidence
> of it; neither here nor online. If there are any docs about the inner
> workings of this functionality, please pass them to me. I continue to
> search for them. Even late last night!
>
> Thanks for your help team,
> Marco.
>
> On Wed, Apr 26, 2023 at 6:21 AM Mich Talebzadeh 
> wrote:
>
>> Indeed very valid points by Ayan. How email is going to handle 1000s of
>> records. As a solution architect I tend to replace. Users by customers and
>> for each order there must be products sort of many to many relationship. If
>> I was a customer I would also be interested in product details as
>> well.sending via email sounds like a Jurassic park solution 😗
>>
>> On Wed, 26 Apr 2023 at 10:24, ayan guha  wrote:
>>
>>> Adding to what Mitch said,
>>>
>>> 1. Are you trying to send statements of all orders to all users? Or the
>>> latest order only?
>>>
>>> 2. Sending email is not a good use of spark. instead, I suggest to use a
>>> notification service or function. Spark should write to a queue (kafka,
>>> sqs...pick your choice here).
>>>
>>> Best regards
>>> Ayan
>>>
>>> On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Well OK in a nutshell you want the result set for every user prepared
 and email to that user right.

 This is a form of ETL where those result sets need to be posted
 somewhere. Say you create a table based on the result set prepared for each
 user. You may have many raw target tables at the end of the first ETL. How
 does this differ from using forEach? Performance wise forEach may not be
 optimal.

 Can you take the sample tables and try your method?

 HTH

 Mich Talebzadeh,
 Lead Solutions Architect/Engineering Lead
 Palantir Technologies Limited
 London
 United Kingdom


view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
 marco.costant...@rocketfncl.com> wrote:

> Hi Mich,
> First, thank you for that. Great effort put into helping.
>
> Second, I don't think this tackles the technical challenge here. I
> understand the windowing as it serves those ranks you created, but I don't
> see how the ranks contribute to the solution.
> Third, the core of the challenge is about performing this kind of
> 'statement' but for all users. In this example we target Mich, but that
> reduces the complexity by a lot! In fact, a simple join and filter would
> solve that one.
>
> Any thoughts on that? For me, the foreach is desirable because I can
> have the workers chain other actions to each iteration (send email, send
> HTTP request, etc).
>
> Thanks Mich,
> Marco.
>
> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi Marco,
>>
>> First thoughts.
>>
>> foreach() is an action operation that is to iterate/loop over each
>> element in the dataset, meaning cursor based. That is different from
>> operating over the dataset as a set which is far more efficient.
>>
>> So in your case as I understand it correctly, you want to get order
>> for each user (say Mich), convert the result set to json and send it to
>> Mich via email
>>
>> Let us try this based on sample data
>>
>> Put your csv files into HDFS directory
>>
>> hdfs dfs -put users.csv /data/stg/test
>> hdfs dfs -put orders.csv /data/stg/test
>>
>> Then create datafram

Re: What is the best way to organize a join within a foreach?

2023-04-26 Thread Marco Costantini
Thanks team,
Email was just an example. The point was to illustrate that some actions
could be chained using Spark's foreach. In reality, this is an S3 write and
a Kafka message production, which I think is quite reasonable for spark to
do.

To answer Ayan's first question. Yes, all a users orders, prepared for each
and every user.

Other than the remarks that email transmission is unwise (which I've now
reminded is irrelevant) I am not seeing an alternative to using Spark's
foreach. Unless, your proposal is for the Spark job to target 1 user, and
just run the job 1000's of times taking the user_id as input. That doesn't
sound attractive.

Also, while we say that foreach is not optimal, I cannot find any evidence
of it; neither here nor online. If there are any docs about the inner
workings of this functionality, please pass them to me. I continue to
search for them. Even late last night!

Thanks for your help team,
Marco.

On Wed, Apr 26, 2023 at 6:21 AM Mich Talebzadeh 
wrote:

> Indeed very valid points by Ayan. How email is going to handle 1000s of
> records. As a solution architect I tend to replace. Users by customers and
> for each order there must be products sort of many to many relationship. If
> I was a customer I would also be interested in product details as
> well.sending via email sounds like a Jurassic park solution 😗
>
> On Wed, 26 Apr 2023 at 10:24, ayan guha  wrote:
>
>> Adding to what Mitch said,
>>
>> 1. Are you trying to send statements of all orders to all users? Or the
>> latest order only?
>>
>> 2. Sending email is not a good use of spark. instead, I suggest to use a
>> notification service or function. Spark should write to a queue (kafka,
>> sqs...pick your choice here).
>>
>> Best regards
>> Ayan
>>
>> On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Well OK in a nutshell you want the result set for every user prepared
>>> and email to that user right.
>>>
>>> This is a form of ETL where those result sets need to be posted
>>> somewhere. Say you create a table based on the result set prepared for each
>>> user. You may have many raw target tables at the end of the first ETL. How
>>> does this differ from using forEach? Performance wise forEach may not be
>>> optimal.
>>>
>>> Can you take the sample tables and try your method?
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
>>> marco.costant...@rocketfncl.com> wrote:
>>>
 Hi Mich,
 First, thank you for that. Great effort put into helping.

 Second, I don't think this tackles the technical challenge here. I
 understand the windowing as it serves those ranks you created, but I don't
 see how the ranks contribute to the solution.
 Third, the core of the challenge is about performing this kind of
 'statement' but for all users. In this example we target Mich, but that
 reduces the complexity by a lot! In fact, a simple join and filter would
 solve that one.

 Any thoughts on that? For me, the foreach is desirable because I can
 have the workers chain other actions to each iteration (send email, send
 HTTP request, etc).

 Thanks Mich,
 Marco.

 On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Hi Marco,
>
> First thoughts.
>
> foreach() is an action operation that is to iterate/loop over each
> element in the dataset, meaning cursor based. That is different from
> operating over the dataset as a set which is far more efficient.
>
> So in your case as I understand it correctly, you want to get order
> for each user (say Mich), convert the result set to json and send it to
> Mich via email
>
> Let us try this based on sample data
>
> Put your csv files into HDFS directory
>
> hdfs dfs -put users.csv /data/stg/test
> hdfs dfs -put orders.csv /data/stg/test
>
> Then create dataframes from csv files, create temp views and do a join
> on result sets with some slicing and dicing on orders table
>
> #! /usr/bin/env python3
> from __future__ import print_function
> import sys
> import findspark
> findspark.init()
> from pyspark.sql import SparkSession
> from pyspark import SparkContext
>

Re: What is the best way to organize a join within a foreach?

2023-04-26 Thread Mich Talebzadeh
Indeed very valid points by Ayan. How email is going to handle 1000s of
records. As a solution architect I tend to replace. Users by customers and
for each order there must be products sort of many to many relationship. If
I was a customer I would also be interested in product details as
well.sending via email sounds like a Jurassic park solution 😗

On Wed, 26 Apr 2023 at 10:24, ayan guha  wrote:

> Adding to what Mitch said,
>
> 1. Are you trying to send statements of all orders to all users? Or the
> latest order only?
>
> 2. Sending email is not a good use of spark. instead, I suggest to use a
> notification service or function. Spark should write to a queue (kafka,
> sqs...pick your choice here).
>
> Best regards
> Ayan
>
> On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh 
> wrote:
>
>> Well OK in a nutshell you want the result set for every user prepared and
>> email to that user right.
>>
>> This is a form of ETL where those result sets need to be posted
>> somewhere. Say you create a table based on the result set prepared for each
>> user. You may have many raw target tables at the end of the first ETL. How
>> does this differ from using forEach? Performance wise forEach may not be
>> optimal.
>>
>> Can you take the sample tables and try your method?
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
>> marco.costant...@rocketfncl.com> wrote:
>>
>>> Hi Mich,
>>> First, thank you for that. Great effort put into helping.
>>>
>>> Second, I don't think this tackles the technical challenge here. I
>>> understand the windowing as it serves those ranks you created, but I don't
>>> see how the ranks contribute to the solution.
>>> Third, the core of the challenge is about performing this kind of
>>> 'statement' but for all users. In this example we target Mich, but that
>>> reduces the complexity by a lot! In fact, a simple join and filter would
>>> solve that one.
>>>
>>> Any thoughts on that? For me, the foreach is desirable because I can
>>> have the workers chain other actions to each iteration (send email, send
>>> HTTP request, etc).
>>>
>>> Thanks Mich,
>>> Marco.
>>>
>>> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Hi Marco,

 First thoughts.

 foreach() is an action operation that is to iterate/loop over each
 element in the dataset, meaning cursor based. That is different from
 operating over the dataset as a set which is far more efficient.

 So in your case as I understand it correctly, you want to get order for
 each user (say Mich), convert the result set to json and send it to Mich
 via email

 Let us try this based on sample data

 Put your csv files into HDFS directory

 hdfs dfs -put users.csv /data/stg/test
 hdfs dfs -put orders.csv /data/stg/test

 Then create dataframes from csv files, create temp views and do a join
 on result sets with some slicing and dicing on orders table

 #! /usr/bin/env python3
 from __future__ import print_function
 import sys
 import findspark
 findspark.init()
 from pyspark.sql import SparkSession
 from pyspark import SparkContext
 from pyspark.sql import SQLContext, HiveContext
 from pyspark.sql.window import Window

 def spark_session(appName):
   return SparkSession.builder \
 .appName(appName) \
 .enableHiveSupport() \
 .getOrCreate()

 def main():
 appName = "ORDERS"
 spark =spark_session(appName)
 # get the sample
 users_file="hdfs://rhes75:9000/data/stg/test/users.csv"
 orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv"
 users_df =
 spark.read.format("com.databricks.spark.csv").option("inferSchema",
 "true").option("header", "true").load(users_file)
 users_df.printSchema()
 """
 root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 """

 print(f"""\n Reading from  {users_file}\n""")
 users_df.show(5,False)
 orders_df =
 spark.read.format("com.databricks.spark.csv").option("inferSchema",
 "true").option("header", "true").load(orders_file)
 orders_df.printSchema()
 """
 root
 |-- id: in

Re: What is the best way to organize a join within a foreach?

2023-04-26 Thread ayan guha
Adding to what Mitch said,

1. Are you trying to send statements of all orders to all users? Or the
latest order only?

2. Sending email is not a good use of spark. instead, I suggest to use a
notification service or function. Spark should write to a queue (kafka,
sqs...pick your choice here).

Best regards
Ayan

On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh 
wrote:

> Well OK in a nutshell you want the result set for every user prepared and
> email to that user right.
>
> This is a form of ETL where those result sets need to be posted somewhere.
> Say you create a table based on the result set prepared for each user. You
> may have many raw target tables at the end of the first ETL. How does this
> differ from using forEach? Performance wise forEach may not be optimal.
>
> Can you take the sample tables and try your method?
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
> marco.costant...@rocketfncl.com> wrote:
>
>> Hi Mich,
>> First, thank you for that. Great effort put into helping.
>>
>> Second, I don't think this tackles the technical challenge here. I
>> understand the windowing as it serves those ranks you created, but I don't
>> see how the ranks contribute to the solution.
>> Third, the core of the challenge is about performing this kind of
>> 'statement' but for all users. In this example we target Mich, but that
>> reduces the complexity by a lot! In fact, a simple join and filter would
>> solve that one.
>>
>> Any thoughts on that? For me, the foreach is desirable because I can have
>> the workers chain other actions to each iteration (send email, send HTTP
>> request, etc).
>>
>> Thanks Mich,
>> Marco.
>>
>> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Marco,
>>>
>>> First thoughts.
>>>
>>> foreach() is an action operation that is to iterate/loop over each
>>> element in the dataset, meaning cursor based. That is different from
>>> operating over the dataset as a set which is far more efficient.
>>>
>>> So in your case as I understand it correctly, you want to get order for
>>> each user (say Mich), convert the result set to json and send it to Mich
>>> via email
>>>
>>> Let us try this based on sample data
>>>
>>> Put your csv files into HDFS directory
>>>
>>> hdfs dfs -put users.csv /data/stg/test
>>> hdfs dfs -put orders.csv /data/stg/test
>>>
>>> Then create dataframes from csv files, create temp views and do a join
>>> on result sets with some slicing and dicing on orders table
>>>
>>> #! /usr/bin/env python3
>>> from __future__ import print_function
>>> import sys
>>> import findspark
>>> findspark.init()
>>> from pyspark.sql import SparkSession
>>> from pyspark import SparkContext
>>> from pyspark.sql import SQLContext, HiveContext
>>> from pyspark.sql.window import Window
>>>
>>> def spark_session(appName):
>>>   return SparkSession.builder \
>>> .appName(appName) \
>>> .enableHiveSupport() \
>>> .getOrCreate()
>>>
>>> def main():
>>> appName = "ORDERS"
>>> spark =spark_session(appName)
>>> # get the sample
>>> users_file="hdfs://rhes75:9000/data/stg/test/users.csv"
>>> orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv"
>>> users_df =
>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>> "true").option("header", "true").load(users_file)
>>> users_df.printSchema()
>>> """
>>> root
>>> |-- id: integer (nullable = true)
>>> |-- name: string (nullable = true)
>>> """
>>>
>>> print(f"""\n Reading from  {users_file}\n""")
>>> users_df.show(5,False)
>>> orders_df =
>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>> "true").option("header", "true").load(orders_file)
>>> orders_df.printSchema()
>>> """
>>> root
>>> |-- id: integer (nullable = true)
>>> |-- description: string (nullable = true)
>>> |-- amount: double (nullable = true)
>>> |-- user_id: integer (nullable = true)
>>>  """
>>> print(f"""\n Reading from  {orders_file}\n""")
>>> orders_df.show(50,False)
>>> users_df.createOrReplaceTempView("users")
>>> orders_df.createOrReplaceTempView("orders")
>>> # Create a list of orders for each user
>>> print(f"""\n Doing a join on two temp views\n""")
>>>
>>> sqltext = """
>>> SELECT u.name, t.order_id, t.description, t

Re: What is the best way to organize a join within a foreach?

2023-04-26 Thread Mich Talebzadeh
Well OK in a nutshell you want the result set for every user prepared and
email to that user right.

This is a form of ETL where those result sets need to be posted somewhere.
Say you create a table based on the result set prepared for each user. You
may have many raw target tables at the end of the first ETL. How does this
differ from using forEach? Performance wise forEach may not be optimal.

Can you take the sample tables and try your method?

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
marco.costant...@rocketfncl.com> wrote:

> Hi Mich,
> First, thank you for that. Great effort put into helping.
>
> Second, I don't think this tackles the technical challenge here. I
> understand the windowing as it serves those ranks you created, but I don't
> see how the ranks contribute to the solution.
> Third, the core of the challenge is about performing this kind of
> 'statement' but for all users. In this example we target Mich, but that
> reduces the complexity by a lot! In fact, a simple join and filter would
> solve that one.
>
> Any thoughts on that? For me, the foreach is desirable because I can have
> the workers chain other actions to each iteration (send email, send HTTP
> request, etc).
>
> Thanks Mich,
> Marco.
>
> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh 
> wrote:
>
>> Hi Marco,
>>
>> First thoughts.
>>
>> foreach() is an action operation that is to iterate/loop over each
>> element in the dataset, meaning cursor based. That is different from
>> operating over the dataset as a set which is far more efficient.
>>
>> So in your case as I understand it correctly, you want to get order for
>> each user (say Mich), convert the result set to json and send it to Mich
>> via email
>>
>> Let us try this based on sample data
>>
>> Put your csv files into HDFS directory
>>
>> hdfs dfs -put users.csv /data/stg/test
>> hdfs dfs -put orders.csv /data/stg/test
>>
>> Then create dataframes from csv files, create temp views and do a join on
>> result sets with some slicing and dicing on orders table
>>
>> #! /usr/bin/env python3
>> from __future__ import print_function
>> import sys
>> import findspark
>> findspark.init()
>> from pyspark.sql import SparkSession
>> from pyspark import SparkContext
>> from pyspark.sql import SQLContext, HiveContext
>> from pyspark.sql.window import Window
>>
>> def spark_session(appName):
>>   return SparkSession.builder \
>> .appName(appName) \
>> .enableHiveSupport() \
>> .getOrCreate()
>>
>> def main():
>> appName = "ORDERS"
>> spark =spark_session(appName)
>> # get the sample
>> users_file="hdfs://rhes75:9000/data/stg/test/users.csv"
>> orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv"
>> users_df =
>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>> "true").option("header", "true").load(users_file)
>> users_df.printSchema()
>> """
>> root
>> |-- id: integer (nullable = true)
>> |-- name: string (nullable = true)
>> """
>>
>> print(f"""\n Reading from  {users_file}\n""")
>> users_df.show(5,False)
>> orders_df =
>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>> "true").option("header", "true").load(orders_file)
>> orders_df.printSchema()
>> """
>> root
>> |-- id: integer (nullable = true)
>> |-- description: string (nullable = true)
>> |-- amount: double (nullable = true)
>> |-- user_id: integer (nullable = true)
>>  """
>> print(f"""\n Reading from  {orders_file}\n""")
>> orders_df.show(50,False)
>> users_df.createOrReplaceTempView("users")
>> orders_df.createOrReplaceTempView("orders")
>> # Create a list of orders for each user
>> print(f"""\n Doing a join on two temp views\n""")
>>
>> sqltext = """
>> SELECT u.name, t.order_id, t.description, t.amount, t.maxorders
>> FROM
>> (
>> SELECT
>> user_id AS user_id
>> ,   id as order_id
>> ,   description as description
>> ,   amount AS amount
>> ,  DENSE_RANK() OVER (PARTITION by user_id ORDER BY amount) AS
>> RANK
>> ,  MAX(amount) OVER (PARTITION by user_id ORDER BY id) AS
>> maxorders
>> FROM orders
>> ) t
>> INNER JOIN users u ON t.user_id = u.id
>> AND  u.name = 'Mich'
>> ORDER BY t.order_id
>> """
>> spark.sql(sqltext).show(50)
>> if __name__ == '__mai