Re: [RNG]: How does Spark handle RNGs?

2021-10-04 Thread Sean Owen
No, it isn't making up new PRNGs. For some function that needs randomness
(e.g. sampling), a few things are important: has to be done independently
within each task, shouldn't be the same (almost surely) across tasks, needs
to be reproducible. You'll find if you look in the source code that
operations like this will generally pick and store a seed, create an RNG
with that seed and use it locally. Different tasks would have different
seeds.

On Mon, Oct 4, 2021 at 3:42 PM Benjamin Du  wrote:

> "Operations on the executor will generally calculate and store a seed once"
>
> Can you elaborate more this? Does Spark try to seed RNGs to ensure overall
> quality of random number generating? To give an extremely example, if all
> workers use the same seed, then RNGs repeat the same numbers on each
> worker, which is obviously a poor choice.
>
>
> Best,
>
> 
>
> Ben Du
>
> Personal Blog  | GitHub
>  | Bitbucket 
> | Docker Hub 
>
> --
> *From:* Sean Owen 
> *Sent:* Monday, October 4, 2021 1:00 PM
> *To:* Benjamin Du 
> *Cc:* user@spark.apache.org 
> *Subject:* Re: [RNG]: How does Spark handle RNGs?
>
> The 2nd approach. Spark doesn't work in the 1st way in any context - the
> driver and executor processes do not cooperate during execution.
> Operations on the executor will generally calculate and store a seed once,
> and use that in RNGs, to make its computation reproducible.
>
> On Mon, Oct 4, 2021 at 2:20 PM Benjamin Du 
> wrote:
>
> Hi everyone,
>
> I'd like to ask how does Spark (or more generally, distributed computing
> engines) handle RNGs? High-level speaking, there are two ways,
>
>1. Use a single RNG on the driver and random numbers generating on
>each work makes request to the single RNG on the driver.
>2. Use a separate RNG on each worker.
>
> If the 2nd approach above is used, may I ask how does Spark seed RNGs on
> different works to ensure the overall quality of random number generating?
>
>
> Best,
>
> 
>
> Ben Du
>
> Personal Blog  | GitHub
>  | Bitbucket 
> | Docker Hub 
>
>


Re: [RNG]: How does Spark handle RNGs?

2021-10-04 Thread Benjamin Du
"Operations on the executor will generally calculate and store a seed once"

Can you elaborate more this? Does Spark try to seed RNGs to ensure overall 
quality of random number generating? To give an extremely example, if all 
workers use the same seed, then RNGs repeat the same numbers on each worker, 
which is obviously a poor choice.


Best,



Ben Du

Personal Blog | GitHub | 
Bitbucket | Docker 
Hub


From: Sean Owen 
Sent: Monday, October 4, 2021 1:00 PM
To: Benjamin Du 
Cc: user@spark.apache.org 
Subject: Re: [RNG]: How does Spark handle RNGs?

The 2nd approach. Spark doesn't work in the 1st way in any context - the driver 
and executor processes do not cooperate during execution.
Operations on the executor will generally calculate and store a seed once, and 
use that in RNGs, to make its computation reproducible.

On Mon, Oct 4, 2021 at 2:20 PM Benjamin Du 
mailto:legendu@outlook.com>> wrote:
Hi everyone,

I'd like to ask how does Spark (or more generally, distributed computing 
engines) handle RNGs? High-level speaking, there are two ways,

  1.  Use a single RNG on the driver and random numbers generating on each work 
makes request to the single RNG on the driver.
  2.  Use a separate RNG on each worker.

If the 2nd approach above is used, may I ask how does Spark seed RNGs on 
different works to ensure the overall quality of random number generating?


Best,



Ben Du

Personal Blog | GitHub | 
Bitbucket | Docker 
Hub


Re: [RNG]: How does Spark handle RNGs?

2021-10-04 Thread Sean Owen
The 2nd approach. Spark doesn't work in the 1st way in any context - the
driver and executor processes do not cooperate during execution.
Operations on the executor will generally calculate and store a seed once,
and use that in RNGs, to make its computation reproducible.

On Mon, Oct 4, 2021 at 2:20 PM Benjamin Du  wrote:

> Hi everyone,
>
> I'd like to ask how does Spark (or more generally, distributed computing
> engines) handle RNGs? High-level speaking, there are two ways,
>
>1. Use a single RNG on the driver and random numbers generating on
>each work makes request to the single RNG on the driver.
>2. Use a separate RNG on each worker.
>
> If the 2nd approach above is used, may I ask how does Spark seed RNGs on
> different works to ensure the overall quality of random number generating?
>
>
> Best,
>
> 
>
> Ben Du
>
> Personal Blog  | GitHub
>  | Bitbucket 
> | Docker Hub 
>


[RNG]: How does Spark handle RNGs?

2021-10-04 Thread Benjamin Du
Hi everyone,

I'd like to ask how does Spark (or more generally, distributed computing 
engines) handle RNGs? High-level speaking, there are two ways,

  1.  Use a single RNG on the driver and random numbers generating on each work 
makes request to the single RNG on the driver.
  2.  Use a separate RNG on each worker.

If the 2nd approach above is used, may I ask how does Spark seed RNGs on 
different works to ensure the overall quality of random number generating?


Best,



Ben Du

Personal Blog | GitHub | 
Bitbucket | Docker 
Hub


[RNG]: How does Spark handle RNGs?

2021-10-04 Thread Benjamin Du
Hi everyone,

I'd like to ask how does Spark (or more generally, distributed computing 
engines) handle RNGs? High-level speaking, there are two ways,

  1.  Use a single RNG on the driver and random numbers generating on each work 
makes request to the single RNG on the driver.
  2.  Use a separate RNG on each worker.

If the 2nd approach above is used, may I ask how does Spark seed RNGs on 
different works to ensure the overall quality of random number generating?


Best,



Ben Du

Personal Blog | GitHub | 
Bitbucket | Docker 
Hub


Re: [Spark] Optimize spark join on different keys for same data frame

2021-10-04 Thread Amit Joshi
Hi spark users,

Can anyone please provide any views on the topic.


Regards
Amit Joshi

On Sunday, October 3, 2021, Amit Joshi  wrote:

> Hi Spark-Users,
>
> Hope you are doing good.
>
> I have been working on cases where a dataframe is joined with more than
> one data frame separately, on different cols, that too frequently.
> I was wondering how to optimize the join to make them faster.
> We can consider the dataset to be big in size so broadcast joins is not an
> option.
>
> For eg:
>
> schema_df1  = new StructType()
> .add(StructField("key1", StringType, true))
> .add(StructField("key2", StringType, true))
> .add(StructField("val", DoubleType, true))
>
>
> schema_df2  = new StructType()
> .add(StructField("key1", StringType, true))
> .add(StructField("val", DoubleType, true))
>
>
> schema_df3  = new StructType()
> .add(StructField("key2", StringType, true))
> .add(StructField("val", DoubleType, true))
>
> Now if we want to join
> join1 =  df1.join(df2,"key1")
> join2 =  df1.join(df3,"key2")
>
> I was thinking of bucketing as a solution to speed up the joins. But if I
> bucket df1 on the key1,then join2  may not benefit, and vice versa (if
> bucket on key2 for df1).
>
> or Should we bucket df1 twice, one with key1 and another with key2?
> Is there a strategy to make both the joins faster for both the joins?
>
>
> Regards
> Amit Joshi
>
>
>
>


Re: [Spark-Core] Spark Dry Run

2021-10-04 Thread Ali Behjati
Hey Ramiro,

Thank you for your detailed answer.
We also have a similar framework which does the same and I saw very good
results. However, pipelines using normal spark apps require change to adapt
to a framework and it requires a lot of effort. This is why I'm suggesting
adding it to spark core to make it available to everyone out of the box.

-
Ali

On Mon, Oct 4, 2021 at 1:35 PM Ramiro Laso  wrote:

> Hello Ali!, I've implemented a dry run in my data pipeline using a schema
> repository. My pipeline takes a "dataset descriptor", which is a json
> describing the dataset you want to build, loads some "entities", applies
> some transformations and then writes the final dataset.
> Is in the "dataset descriptor" where users can commit some mistakes or if
> they reimplemented some steps inside the pipeline.  So, to perform a dry
> run, first we separated the actions from the transformation. Each step
> inside the pipeline has "input", "transform" and "write" methods. So, when
> que want to "dry run" a pipeline, we obtain the schemas of the entities and
> build "empty rdds" that we use as Input of the pipeline. Finally we just
> trigger an action to test that all selected columns and queries in the
> "dataset descriptor" are ok.
> This is how you can create an empty dataset:
>
> emp_RDD: RDD = spark.sparkContext.emptyRDD()
> df = spark.createDataFrame(emp_RDD, schema)
>
> Ramiro.
>
> On Thu, Sep 30, 2021 at 11:48 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Ok thanks.
>>
>> What is your experience of VS Code (in terms of capabilities ) as it is
>> becoming a standard tool available in Cloud workspaces like Amazon
>> workspace?
>>
>> Mich
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 30 Sept 2021 at 15:43, Ali Behjati  wrote:
>>
>>> Not anything specific in my mind. Any IDE which is open to plugins can
>>> use it (e.g: VS Code and Jetbrains) to validate execution plans in the
>>> background and mark syntax errors based on the result.
>>>
>>> On Thu, Sep 30, 2021 at 4:40 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 What IDEs do you have in mind?



view my Linkedin profile
 



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Thu, 30 Sept 2021 at 15:20, Ali Behjati  wrote:

> Yeah it doesn't remove the need of testing on sample data. It would be
> more of syntax check rather than test. I have witnessed that syntax errors
> occur a lot.
>
> Maybe after having dry-run we will be able to create some automation
> around basic syntax checking for IDEs too.
>
> On Thu, Sep 30, 2021 at 4:15 PM Sean Owen  wrote:
>
>> If testing, wouldn't you actually want to execute things? even if at
>> a small scale, on a sample of data?
>>
>> On Thu, Sep 30, 2021 at 9:07 AM Ali Behjati 
>> wrote:
>>
>>> Hey everyone,
>>>
>>>
>>> By dry run I mean ability to validate the execution plan but not
>>> executing it within the code. I was wondering whether this exists in 
>>> spark
>>> or not. I couldn't find it anywhere.
>>>
>>> If it doesn't exist I want to propose adding such a feature in
>>> spark.
>>>
>>> Why is it useful?
>>> 1. Faster testing: When using pyspark or spark on scala/java without
>>> DataSet we are prone to typos and mistakes about column names and other
>>> logical problems. Unfortunately IDEs won't help much and when dealing 
>>> with
>>> Big Data, testing by running the code takes a lot of time. So this way 
>>> we
>>> can understand typos very fast.
>>>
>>> 2. (Continuous) Integrity checks: When there are upstream and
>>> downstream pipelines, we can understand breaking changes much faster by
>>> running downstream pipelines in "dry run" mode.
>>>
>>> I believe it is not so hard to implement and I volunteer to work on
>>> it if the community approves this feature request.
>>>
>>> It can be tackled in different ways. I have two Ideas for
>>> implementation:
>>> 1. Noop (No Op) executor engine
>>> 2. On reads just infer schema and replace it with 

Re: Trying to hash cross features with mllib

2021-10-04 Thread David Diebold
Hello Sean,

Thank you for the heads-up !
Interaction transform won't help for my use case as it returns a vector
that I won't be able to hash.
I will definitely dig further into custom transformations though.

Thanks !
David

Le ven. 1 oct. 2021 à 15:49, Sean Owen  a écrit :

> Are you looking for
> https://spark.apache.org/docs/latest/ml-features.html#interaction ?
> That's the closest built in thing I can think of.  Otherwise you can make
> custom transformations.
>
> On Fri, Oct 1, 2021, 8:44 AM David Diebold 
> wrote:
>
>> Hello everyone,
>>
>> In MLLib, I’m trying to rely essentially on pipelines to create features
>> out of the Titanic dataset, and show-case the power of feature hashing. I
>> want to:
>>
>> -  Apply bucketization on some columns (QuantileDiscretizer is
>> fine)
>>
>> -  Then I want to cross all my columns with each other to have
>> cross features.
>>
>> -  Then I would like to hash all of these cross features into a
>> vector.
>>
>> -  Then give it to a logistic regression.
>>
>> Looking at the documentation, it looks like the only way to hash features
>> is the *FeatureHasher* transformation. It takes multiple columns as
>> input, type can be numeric, bool, string (but no vector/array).
>>
>> But now I’m left wondering how I can create my cross-feature columns. I’m
>> looking at a transformation that could take two columns as input, and
>> return a numeric, bool, or string. I didn't manage to find anything that
>> does the job. There are multiple transformations such as VectorAssembler,
>> that operate on vector, but this is not a typeaccepted by the FeatureHasher.
>>
>> Of course, I could try to combine columns directly in my dataframe
>> (before the pipeline kicks-in), but then I would not be able to benefit any
>> more from QuantileDiscretizer and other cool functions.
>>
>>
>> Am I missing something in the transformation api ? Or is my approach to
>> hashing wrong ? Or should we consider to extend the api somehow ?
>>
>>
>>
>> Thank you, kind regards,
>>
>> David
>>
>


Re: [Spark-Core] Spark Dry Run

2021-10-04 Thread Ramiro Laso
Hello Ali!, I've implemented a dry run in my data pipeline using a schema
repository. My pipeline takes a "dataset descriptor", which is a json
describing the dataset you want to build, loads some "entities", applies
some transformations and then writes the final dataset.
Is in the "dataset descriptor" where users can commit some mistakes or if
they reimplemented some steps inside the pipeline.  So, to perform a dry
run, first we separated the actions from the transformation. Each step
inside the pipeline has "input", "transform" and "write" methods. So, when
que want to "dry run" a pipeline, we obtain the schemas of the entities and
build "empty rdds" that we use as Input of the pipeline. Finally we just
trigger an action to test that all selected columns and queries in the
"dataset descriptor" are ok.
This is how you can create an empty dataset:

emp_RDD: RDD = spark.sparkContext.emptyRDD()
df = spark.createDataFrame(emp_RDD, schema)

Ramiro.

On Thu, Sep 30, 2021 at 11:48 AM Mich Talebzadeh 
wrote:

> Ok thanks.
>
> What is your experience of VS Code (in terms of capabilities ) as it is
> becoming a standard tool available in Cloud workspaces like Amazon
> workspace?
>
> Mich
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 30 Sept 2021 at 15:43, Ali Behjati  wrote:
>
>> Not anything specific in my mind. Any IDE which is open to plugins can
>> use it (e.g: VS Code and Jetbrains) to validate execution plans in the
>> background and mark syntax errors based on the result.
>>
>> On Thu, Sep 30, 2021 at 4:40 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> What IDEs do you have in mind?
>>>
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 30 Sept 2021 at 15:20, Ali Behjati  wrote:
>>>
 Yeah it doesn't remove the need of testing on sample data. It would be
 more of syntax check rather than test. I have witnessed that syntax errors
 occur a lot.

 Maybe after having dry-run we will be able to create some automation
 around basic syntax checking for IDEs too.

 On Thu, Sep 30, 2021 at 4:15 PM Sean Owen  wrote:

> If testing, wouldn't you actually want to execute things? even if at a
> small scale, on a sample of data?
>
> On Thu, Sep 30, 2021 at 9:07 AM Ali Behjati 
> wrote:
>
>> Hey everyone,
>>
>>
>> By dry run I mean ability to validate the execution plan but not
>> executing it within the code. I was wondering whether this exists in 
>> spark
>> or not. I couldn't find it anywhere.
>>
>> If it doesn't exist I want to propose adding such a feature in spark.
>>
>> Why is it useful?
>> 1. Faster testing: When using pyspark or spark on scala/java without
>> DataSet we are prone to typos and mistakes about column names and other
>> logical problems. Unfortunately IDEs won't help much and when dealing 
>> with
>> Big Data, testing by running the code takes a lot of time. So this way we
>> can understand typos very fast.
>>
>> 2. (Continuous) Integrity checks: When there are upstream and
>> downstream pipelines, we can understand breaking changes much faster by
>> running downstream pipelines in "dry run" mode.
>>
>> I believe it is not so hard to implement and I volunteer to work on
>> it if the community approves this feature request.
>>
>> It can be tackled in different ways. I have two Ideas for
>> implementation:
>> 1. Noop (No Op) executor engine
>> 2. On reads just infer schema and replace it with empty table with
>> same schema
>>
>> Thanks,
>> Ali
>>
>


Current state of dataset api

2021-10-04 Thread Magnus Nilsson
Hi,

I tried using the (typed) Dataset API about three years ago. Then
there were limitations with predicate pushdown, overhead serialization
and maybe more things I've forgotten. Ultimately we chose the
Dataframe API as the sweet spot.

Does anyone know of a good overview of the current state of the
Dataset API, pros/cons as of Spark 3?

Is it fully usable, do you get the advantages of a strongly typed
dataframe? Any known limitations or drawbacks to take into account?

br,

Magnus

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[spark streaming] how to connect to rabbitmq with spark streaming.

2021-10-04 Thread Joris Billen
Hi,
I am looking for someone who has made a spark streaming job that connects to 
rabbitmq.
There is a lot of documentation how to make a connection with a java api (like 
here: https://www.rabbitmq.com/api-guide.html#connecting) , but I am looking 
for a recent working example for spark streaming (which will save the incoming 
data in a Dstream).
Tried so far: this looks close, but throws errors: 
https://github.com/Stratio/spark-rabbitmq/issues ). Also tried with MQTT (we 
get an error that the protocal is amqp:// and not tcp:// or ssl://; also 
noticed that for mqtt the plugin needs to be enabled on the rabbitmq server's 
side, and since it is 3rd party we dont control this).


Thanks for any input!