Re: Proposed additional function to create fold_column for better integration of Spark data frames with H2O

2022-01-11 Thread Chester Gan
Another possible workaround, when creating an ML pipeline with PySpark and
Python's H2O API, would be to first convert the PySpark dataframes to H2O
dataframes, then do the following:


   1. Create a new dataframe from the feature dataframe using
   drop_duplicates (call this group_df), with columns argument = list of
   columns that form the group
   
https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.drop_duplicates
   :
   2. Create a new, single-column dataframe (call this kfold_df) from
   group_df using kfold_column:
   
https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.kfold_column
   3. Column-bind `kfold_df` back onto `group_df` with cbind:
   
https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.cbind
   4. Merge group_df back to the feature dataframe on the common columns
   (which would be the group columns specified in step 1. above) to create the
   feature dataset with fold_column:
   
https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-munging/merging-data.html.
   Observations with the same values in the group columns will have the same
   fold index in the fold_column.
   5. Configure the parameters of the H2O AutoML job, as per
   https://docs.h2o.ai/h2o/latest-stable/h2o-docs/automl.html, ensuring
   that fold_column argument = name of the column created from calling
   kfold_column:
   
https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/algo-params/fold_column.html,
   class imbalance is rectified and so on.

Any suggestions? Are there already similar wrapper functions in PySpark or
H2O that assign fold indices based on groups of observations, rather than
individual observations?
Should I propose this in the d...@spark.apache.org mailing list??

On Fri, Jan 7, 2022 at 1:04 PM Chester Gan  wrote:

> Idea: PySpark function to create fold indices (numbers from 0, ..., N-1,
> where N := number of folds needed for k-fold CV during auto ML training) on
> train & test datasets
>
> ```
> # train & test are PySpark dataframes of the train & test datasets
> respectively
> import pyspark.sql.functions as F
> from functools import reduce  # For Python 3.x
> from pyspark.sql import DataFrame
> def unionAll(*dfs):
> return reduce(DataFrame.unionAll, dfs)
>
> def create_fold_column(input_df,
> list_of_fold_group_columns=list_of_fold_group_columns,
> fold_column=fold_column, nfolds=nfolds):
>
> if fold_column:
> # split_into_frikkin_folds... we get a list of Spark dataframes of
> group ids
> fold_group_ids_list_of_dataframes =
> input_df.drop_duplicates(subset=list_of_fold_group_columns)[list_of_fold_group_columns].randomSplit(nfolds
> * [1 / nfolds], seed=42)
>
> for index in range(0, len(fold_group_ids_list_of_dataframes)):
> fold_group_ids_list_of_dataframes[index] =
> fold_group_ids_list_of_dataframes[index].withColumn(fold_column,
> F.lit(index))
>
> fold_groups_ids_dataframes_union =
> unionAll(*fold_group_ids_list_of_dataframes)
> input_df = input_df.join(fold_groups_ids_dataframes_union,
> on=list_of_fold_group_columns)
>
> return(input_df)
>
> train = train.transform(create_fold_column)
> # Dummy fold_column with single number, nfolds (typically 5), to prevent
> H2O from error-ing out
> if fold_column:
> test = test.withColumn(fold_column, F.lit(nfolds))
>
> ```
>


Re: Proposed additional function to create fold_column for better integration of Spark data frames with H2O

2022-01-11 Thread Chester Gan
Has anyone else used PySpark dataframes in conjunction with H2O for ML
pipelining, and have had to use custom folds to keep rows/observations of
the same group (e.g. user account, vehicle, city or whatever) in the same
validation fold, so as to prevent data leakage during cross-validation??

On Fri, Jan 7, 2022 at 1:04 PM Chester Gan  wrote:

> Idea: PySpark function to create fold indices (numbers from 0, ..., N-1,
> where N := number of folds needed for k-fold CV during auto ML training) on
> train & test datasets
>
> ```
> # train & test are PySpark dataframes of the train & test datasets
> respectively
> import pyspark.sql.functions as F
> from functools import reduce  # For Python 3.x
> from pyspark.sql import DataFrame
> def unionAll(*dfs):
> return reduce(DataFrame.unionAll, dfs)
>
> def create_fold_column(input_df,
> list_of_fold_group_columns=list_of_fold_group_columns,
> fold_column=fold_column, nfolds=nfolds):
>
> if fold_column:
> # split_into_frikkin_folds... we get a list of Spark dataframes of
> group ids
> fold_group_ids_list_of_dataframes =
> input_df.drop_duplicates(subset=list_of_fold_group_columns)[list_of_fold_group_columns].randomSplit(nfolds
> * [1 / nfolds], seed=42)
>
> for index in range(0, len(fold_group_ids_list_of_dataframes)):
> fold_group_ids_list_of_dataframes[index] =
> fold_group_ids_list_of_dataframes[index].withColumn(fold_column,
> F.lit(index))
>
> fold_groups_ids_dataframes_union =
> unionAll(*fold_group_ids_list_of_dataframes)
> input_df = input_df.join(fold_groups_ids_dataframes_union,
> on=list_of_fold_group_columns)
>
> return(input_df)
>
> train = train.transform(create_fold_column)
> # Dummy fold_column with single number, nfolds (typically 5), to prevent
> H2O from error-ing out
> if fold_column:
> test = test.withColumn(fold_column, F.lit(nfolds))
>
> ```
>


Re: pyspark loop optimization

2022-01-11 Thread Gourav Sengupta
Hi,

I am not sure what you are trying to achieve here are cume_dist and
percent_rank not different?

If am able to follow your question correctly, you are looking for filtering
our NULLs before applying the function on the dataframe, and I think it
will be fine if you just create another dataframe first with the non null
values and then apply the function to that dataframe.

It will be of much help if you can explain what are you trying to achieve
here. Applying loops on dataframe, like you have done in the dataframe is
surely not recommended at all, please see the explain plan of the dataframe
in each iteration to understand the effect of your loops on the explain
plan - that should give some details.


Regards,
Gourav Sengupta

On Mon, Jan 10, 2022 at 10:49 PM Ramesh Natarajan 
wrote:

> I want to compute cume_dist on a bunch of columns in a spark dataframe,
> but want to remove NULL values before doing so.
>
> I have this loop in pyspark. While this works, I see the driver runs at
> 100% while the executors are idle for the most part. I am reading that
> running a loop is an anti-pattern and should be avoided. Any pointers on
> how to optimize this section of pyspark code?
>
> I am running this on  the AWS Glue 3.0 environment.
>
> for column_name, new_col in [
> ("event_duration", "percentile_rank_evt_duration"),
> ("event_duration_pred", "percentile_pred_evt_duration"),
> ("alarm_cnt", "percentile_rank_alarm_cnt"),
> ("alarm_cnt_pred", "percentile_pred_alarm_cnt"),
> ("event_duration_adj", "percentile_rank_evt_duration_adj"),
> ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"),
> ("encounter_time", "percentile_rank_encounter_time"),
> ("encounter_time_pred", "percentile_pred_encounter_time"),
> ("encounter_time_adj", "percentile_rank_encounter_time_adj"),
> ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"),
> ]:
> win = (
> Window().partitionBy(["p_customer_name", "p_site_name",
> "year_month"])
>  .orderBy(col(column_name))
> )
> df1 = df.filter(F.col(column_name).isNull())
> df2 = df.filter(F.col(column_name).isNotNull()).withColumn(
> new_col, F.round(F.cume_dist().over(win) *
> lit(100)).cast("integer")
> )
> df = df2.unionByName(df1, allowMissingColumns=True)
>
> For some reason this code seems to work faster, but it doesn't remove
> NULLs prior to computing the cume_dist. Not sure if this is also a proper
> way to do this :(
>
> for column_name, new_col in [
> ("event_duration", "percentile_rank_evt_duration"),
> ("event_duration_pred", "percentile_pred_evt_duration"),
> ("alarm_cnt", "percentile_rank_alarm_cnt"),
> ("alarm_cnt_pred", "percentile_pred_alarm_cnt"),
> ("event_duration_adj", "percentile_rank_evt_duration_adj"),
> ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"),
> ("encounter_time", "percentile_rank_encounter_time"),
> ("encounter_time_pred", "percentile_pred_encounter_time"),
> ("encounter_time_adj", "percentile_rank_encounter_time_adj"),
> ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"),
> ]:
> win = (
> Window().partitionBy(["p_customer_name", "p_site_name",
> "year_month"])
> .orderBy(col(column_name))
> )
> df = df.withColumn(
> new_col,
> F.when(F.col(column_name).isNull(), F.lit(None)).otherwise(
> F.round(F.percent_rank().over(win) *
> lit(100)).cast("integer")
> ),
> )
>
> Appreciate if anyone has any pointers on how to go about this..
>
> thanks
> Ramesh
>


Re: How to add a row number column with out reordering my data frame

2022-01-11 Thread Andrew Davidson
Thanks!

I will take a look

Andy

From: Gourav Sengupta 
Date: Tuesday, January 11, 2022 at 8:42 AM
To: Andrew Davidson 
Cc: Andrew Davidson , "user @spark" 

Subject: Re: How to add a row number column with out reordering my data frame

Hi,
I do not think we need to do any of that. Please try repartitionbyrange, dpark 
3 has adaptive query execution with configurations to handle skew as well.

Regards,
Gourav

On Tue, Jan 11, 2022 at 4:21 PM Andrew Davidson 
mailto:aedav...@ucsc.edu>> wrote:
HI Gourav

When I join I get OOM. To address this my thought was to split my tables into 
small batches of rows. And then join the batch together then use union. My 
assumption is the union is a narrow transform and as such require fewer 
resources. Let say I have 5 data frames I want to join together and each has 
300 rows

I want to create 15 data frames.

Set1 = {11, 12, 13, 14, 15}

Set2 = {21, 22, 23, 24, 25}

Set3 = {31, 32, 33, 34, 35)

The joined the “batch
S1joinDF = 11.join(12).join(13).join(14).join(15)

S2joinDF = 21.join(22).join(23).join(24).join(25)

S3joinDF = 31.join(32).join(33).join(34).join(35)

resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF )

The I originally wrote my code is as follows. Based on my unit test it turns 
out I need to call orderBy on every iteration of the for loop. I tried sorting 
outside of the while loop, did not resolve problem Given the size of my 
dataframes that is going crush performance. My unit test works. I never ran it 
on my real data set.

# Create a copy of original dataframe
copyDF = df.orderBy("Name")
# copyDF.show()

i = 0
while i < numberOfSplits:
self.logger.warn("i:{}".format(i))
# Get the top `numRows` number of rows
# note take() is an action
# limit() is a transformation
topDF = copyDF.limit( numRows )

# Truncate the `copy_df` to remove
# the contents fetched for `temp_df`
# original quant.sf files are sorted by name however
# we must use order by, else the row names between
# GTEx sample will not be the same
# we can not simply sort or orderBy once. we have to
# do this on every iteration
copyDF = copyDF.subtract(topDF).orderBy( "Name" )

retList[i] = topDF

# Increment the split number
i += 1

if remainingRows > 0 :
self.logger.info("AEDWIP writing last i:{} 
len(retList):{}".format(i, len(retList)))
retList[i] = copyDF
#copyDF.show()
#retList[i].show()


okay so that the background. Rather than use order by. I thought if I could add 
a row number I could easily split up mydata frames. My code would look a lot 
like what I would write in pandas or R

while i < numBatches:
start = i * numRows
end = start + numRows
print("\ni:{} start:{} end:{}".format(i, start,end))
df = trainDF.iloc[ start:end ]

There does not seem to be an easy way to do this.
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html
The generated ID is guaranteed to be monotonically increasing and unique, but 
not consecutive.


Comments and suggestions appreciated

Andy


From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Date: Monday, January 10, 2022 at 11:03 AM
To: Andrew Davidson 
Cc: "user @spark" mailto:user@spark.apache.org>>
Subject: Re: How to add a row number column with out reordering my data frame

Hi,

I am a bit confused here, it is not entirely clear to me why are you creating 
the row numbers, and how creating the row numbers helps you with the joins?

Can you please explain with some sample data?


Regards,
Gourav

On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson  
wrote:
Hi

I am trying to work through a OOM error. I have 10411 files. I want to select a 
single column from each file and then join them into a single table.

The files have a row unique id. However it is a very long string. The data file 
with just the name and column of interest is about 470 M. The column of 
interest alone is 21 m. it is a column over 5 million real numbers.

So I thought I would save a lot of memory if I can join over row numbers.

# create dummy variable to orderby https://www.py4u.net/discuss/1840945
w = Window().orderBy(lit('A'))
sampleDF = sampleDF.select( ["NumReads"] )\
.withColumnRenamed( "NumReads", sampleName )\
.withColumn( "tid",row_number().over(w) )


This code seem pretty complicated as someone coming from pandas an R 
dataframes. My unit test works however it generates the following warning.



WARN WindowExec: No Partition Defined for Window operation! Moving all data to 
a single partition, this can cause serious performance degradation.


Is there a better  way to create a row number with our reordering 

[Spark ML Pipeline]: Error Loading Pipeline Model with Custom Transformer

2022-01-11 Thread Alana Young
I am experimenting with creating and persisting ML pipelines using custom 
transformers (I am using Spark 3.1.2). I was able to create a transformer class 
(for testing purposes, I modeled the code off the SQLTransformer class) and 
save the pipeline model. When I attempt to load the saved pipeline model, I am 
running into the following error: 

java.lang.NullPointerException
  at java.base/java.lang.reflect.Method.invoke(Method.java:559)
  at 
org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstanceReader(ReadWrite.scala:631)
  at 
org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$load$4(Pipeline.scala:276)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
  at 
org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$load$3(Pipeline.scala:274)
  at 
org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
  at scala.util.Try$.apply(Try.scala:213)
  at 
org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
  at org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:268)
  at 
org.apache.spark.ml.PipelineModel$PipelineModelReader.$anonfun$load$7(Pipeline.scala:356)
  at org.apache.spark.ml.MLEvents.withLoadInstanceEvent(events.scala:160)
  at org.apache.spark.ml.MLEvents.withLoadInstanceEvent$(events.scala:155)
  at 
org.apache.spark.ml.util.Instrumentation.withLoadInstanceEvent(Instrumentation.scala:42)
  at 
org.apache.spark.ml.PipelineModel$PipelineModelReader.$anonfun$load$6(Pipeline.scala:355)
  at 
org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
  at scala.util.Try$.apply(Try.scala:213)
  at 
org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
  at 
org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:355)
  at 
org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:349)
  ... 38 elided


Here is a gist 
 containing 
the relevant code. Any feedback and advice would be appreciated. Thank you. 

Re: How to add a row number column with out reordering my data frame

2022-01-11 Thread Gourav Sengupta
Hi,
I do not think we need to do any of that. Please try repartitionbyrange,
dpark 3 has adaptive query execution with configurations to handle skew as
well.

Regards,
Gourav

On Tue, Jan 11, 2022 at 4:21 PM Andrew Davidson  wrote:

> HI Gourav
>
>
>
> When I join I get OOM. To address this my thought was to split my tables
> into small batches of rows. And then join the batch together then use
> union. My assumption is the union is a narrow transform and as such require
> fewer resources. Let say I have 5 data frames I want to join together and
> each has 300 rows
>
>
>
> I want to create 15 data frames.
>
>
>
> Set1 = {11, 12, 13, 14, 15}
>
>
>
> Set2 = {21, 22, 23, 24, 25}
>
>
>
> Set3 = {31, 32, 33, 34, 35)
>
>
>
> The joined the “batch
>
> S1joinDF = 11.join(12).join(13).join(14).join(15)
>
>
>
> S2joinDF = 21.join(22).join(23).join(24).join(25)
>
>
>
> S3joinDF = 31.join(32).join(33).join(34).join(35)
>
>
>
> resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF )
>
>
>
> The I originally wrote my code is as follows. Based on my unit test it
> turns out I need to call orderBy on every iteration of the for loop. I
> tried sorting outside of the while loop, did not resolve problem Given the
> size of my dataframes that is going crush performance. My unit test works.
> I never ran it on my real data set.
>
>
>
> # Create a copy of original *dataframe*
>
> copyDF = df.orderBy("Name")
>
> # copyDF.show()
>
>
>
> i = 0
>
> while i < numberOfSplits:
>
> self.logger.warn("i:{}".format(i))
>
> # Get the top `numRows` number of rows
>
> # note take() is an action
>
> # limit() is a transformation
>
> topDF = copyDF.limit( numRows )
>
>
>
> # Truncate the `copy_df` to remove
>
> # the contents fetched for `temp_df`
>
> # original quant.sf files are sorted by name however
>
> # we must use order by, else the row names between
>
> # GTEx sample will not be the same
>
> # we can not simply sort or orderBy once. we have to
>
> # do this on every iteration
>
> copyDF = copyDF.subtract(topDF).orderBy( "Name" )
>
>
>
> retList[i] = topDF
>
>
>
> # Increment the split number
>
> i += 1
>
>
>
> if remainingRows > 0 :
>
> self.logger.info("AEDWIP writing last i:{} 
> *len*(retList):{}".format(i,
> len(retList)))
>
> retList[i] = copyDF
>
> #copyDF.show()
>
> #retList[i].show()
>
>
>
>
>
> okay so that the background. Rather than use order by. I thought if I
> could add a row number I could easily split up mydata frames. My code would
> look a lot like what I would write in pandas or R
>
>
>
> *while* i *<* numBatches:
>
> start *=* i *** numRows
>
> end *=* start *+* numRows
>
> print("\ni:{} start:{} end:{}"*.*format(i, start,end))
>
> df *=* trainDF*.*iloc[ start:end ]
>
>
>
> There does not seem to be an easy way to do this.
>
>
> https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html
>
> The generated ID is guaranteed to be monotonically increasing and unique,
> but not consecutive.
>
>
>
>
>
> Comments and suggestions appreciated
>
>
>
> Andy
>
>
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Monday, January 10, 2022 at 11:03 AM
> *To: *Andrew Davidson 
> *Cc: *"user @spark" 
> *Subject: *Re: How to add a row number column with out reordering my data
> frame
>
>
>
> Hi,
>
>
>
> I am a bit confused here, it is not entirely clear to me why are you
> creating the row numbers, and how creating the row numbers helps you with
> the joins?
>
>
>
> Can you please explain with some sample data?
>
>
>
>
>
> Regards,
>
> Gourav
>
>
>
> On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson 
> wrote:
>
> Hi
>
>
>
> I am trying to work through a OOM error. I have 10411 files. I want to
> select a single column from each file and then join them into a single
> table.
>
>
>
> The files have a row unique id. However it is a very long string. The data
> file with just the name and column of interest is about 470 M. The column
> of interest alone is 21 m. it is a column over 5 million real numbers.
>
>
>
> So I thought I would save a lot of memory if I can join over row numbers.
>
>
>
> # create *dummy* variable to *orderby*
> https://www.py4u.net/discuss/1840945
>
> w = Window().orderBy(lit('A'))
>
> sampleDF = sampleDF.select( ["NumReads"] )\
>
> .withColumnRenamed( "NumReads", sampleName )\
>
> .withColumn( "*tid*",row_number().over(w) )
>
>
>
>
>
> This code seem pretty complicated as someone coming from pandas an R
> dataframes. My unit test works however it generates the following warning.
>
>
>
>
>
> WARN WindowExec: No Partition Defined for Window operation! Moving all
> data to a single partition, this can 

Re: How to add a row number column with out reordering my data frame

2022-01-11 Thread Andrew Davidson
HI Gourav

When I join I get OOM. To address this my thought was to split my tables into 
small batches of rows. And then join the batch together then use union. My 
assumption is the union is a narrow transform and as such require fewer 
resources. Let say I have 5 data frames I want to join together and each has 
300 rows

I want to create 15 data frames.

Set1 = {11, 12, 13, 14, 15}

Set2 = {21, 22, 23, 24, 25}

Set3 = {31, 32, 33, 34, 35)

The joined the “batch
S1joinDF = 11.join(12).join(13).join(14).join(15)

S2joinDF = 21.join(22).join(23).join(24).join(25)

S3joinDF = 31.join(32).join(33).join(34).join(35)

resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF )

The I originally wrote my code is as follows. Based on my unit test it turns 
out I need to call orderBy on every iteration of the for loop. I tried sorting 
outside of the while loop, did not resolve problem Given the size of my 
dataframes that is going crush performance. My unit test works. I never ran it 
on my real data set.

# Create a copy of original dataframe
copyDF = df.orderBy("Name")
# copyDF.show()

i = 0
while i < numberOfSplits:
self.logger.warn("i:{}".format(i))
# Get the top `numRows` number of rows
# note take() is an action
# limit() is a transformation
topDF = copyDF.limit( numRows )

# Truncate the `copy_df` to remove
# the contents fetched for `temp_df`
# original quant.sf files are sorted by name however
# we must use order by, else the row names between
# GTEx sample will not be the same
# we can not simply sort or orderBy once. we have to
# do this on every iteration
copyDF = copyDF.subtract(topDF).orderBy( "Name" )

retList[i] = topDF

# Increment the split number
i += 1

if remainingRows > 0 :
self.logger.info("AEDWIP writing last i:{} 
len(retList):{}".format(i, len(retList)))
retList[i] = copyDF
#copyDF.show()
#retList[i].show()


okay so that the background. Rather than use order by. I thought if I could add 
a row number I could easily split up mydata frames. My code would look a lot 
like what I would write in pandas or R

while i < numBatches:
start = i * numRows
end = start + numRows
print("\ni:{} start:{} end:{}".format(i, start,end))
df = trainDF.iloc[ start:end ]

There does not seem to be an easy way to do this.
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html
The generated ID is guaranteed to be monotonically increasing and unique, but 
not consecutive.


Comments and suggestions appreciated

Andy


From: Gourav Sengupta 
Date: Monday, January 10, 2022 at 11:03 AM
To: Andrew Davidson 
Cc: "user @spark" 
Subject: Re: How to add a row number column with out reordering my data frame

Hi,

I am a bit confused here, it is not entirely clear to me why are you creating 
the row numbers, and how creating the row numbers helps you with the joins?

Can you please explain with some sample data?


Regards,
Gourav

On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson  
wrote:
Hi

I am trying to work through a OOM error. I have 10411 files. I want to select a 
single column from each file and then join them into a single table.

The files have a row unique id. However it is a very long string. The data file 
with just the name and column of interest is about 470 M. The column of 
interest alone is 21 m. it is a column over 5 million real numbers.

So I thought I would save a lot of memory if I can join over row numbers.

# create dummy variable to orderby https://www.py4u.net/discuss/1840945
w = Window().orderBy(lit('A'))
sampleDF = sampleDF.select( ["NumReads"] )\
.withColumnRenamed( "NumReads", sampleName )\
.withColumn( "tid",row_number().over(w) )


This code seem pretty complicated as someone coming from pandas an R 
dataframes. My unit test works however it generates the following warning.



WARN WindowExec: No Partition Defined for Window operation! Moving all data to 
a single partition, this can cause serious performance degradation.


Is there a better  way to create a row number with our reordering my data? The 
order is important

Kind regards

Andy


Re: Difference in behavior for Spark 3.0 vs Spark 3.1 "create database "

2022-01-11 Thread Wenchen Fan
Hopefully, this StackOverflow answer can solve your problem:
https://stackoverflow.com/questions/47523037/how-do-i-configure-pyspark-to-write-to-hdfs-by-default

Spark doesn't control the behavior of qualifying paths. It's decided by
certain configs and/or config files.

On Tue, Jan 11, 2022 at 3:03 AM Pablo Langa Blanco  wrote:

> Hi Pralabh,
>
> If it helps, it is probably related to this change
> https://github.com/apache/spark/pull/28527
>
> Regards
>
> On Mon, Jan 10, 2022 at 10:42 AM Pralabh Kumar 
> wrote:
>
>> Hi Spark Team
>>
>> When creating a database via Spark 3.0 on Hive
>>
>> 1) spark.sql("create database test location '/user/hive'").  It creates
>> the database location on hdfs . As expected
>>
>> 2) When running the same command on 3.1 the database is created on the
>> local file system by default. I have to prefix with hdfs to create db on
>> hdfs.
>>
>> Why is there a difference in the behavior, Can you please point me to the
>> jira which causes this change.
>>
>> Note : spark.sql.warehouse.dir and hive.metastore.warehouse.dir both are
>> having default values(not explicitly set)
>>
>> Regards
>> Pralabh Kumar
>>
>