Java Spark to Python spark integration

2018-10-30 Thread Manohar Rao
I would like to know if its possible to invoke python spark code from java.

I have a java based framework where
a sparksession is created and a some dataframes are passed as argument to
an api .

Transformation.java
interface   Transformation
 {
 Dataset transform(Set inputDatasets , SparkSession spark);
 }

A user of this framework can them implement a transformation and the
framework can then use this custom transformation
along with rest of the standard transformations . This then integrates into
a larger data pipeline.

Question.

Some users would like to use python (pyspark ) code to write business logic.

Is there a possibility of passing this java Dataset ( or RDD) via the
framework
to python code and then retrieving the python RDD/dataset back as the
output to the java framework.

Any reference to some code snippets around this will be helpful  .

Thanks

Manohar


Pivot Column ordering in spark

2018-09-26 Thread Manohar Rao
I am doing a pivot transformation on an input dataset


Following input schema
=
 |-- c_salutation: string (nullable = true)
 |-- c_preferred_cust_flag: string (nullable = true)
 |-- integer_type_col: integer (nullable = false)
 |-- long_type_col: long (nullable = false)
 |-- string_type_col: string (nullable = true)
 |-- decimal_type_col: decimal(38,0) (nullable = true)

 My pivot column is c_preferred_cust_flag , pivot values is "Y","N","R"
  and group by column is c_salutation

 I am using the api  * pivot(String pivotColumn,*
*  java.util.List values) *
*on  RelationalGroupedDataset*



My aggregation functions after this pivot is
===
count(`string_type_col`) ,sum(`string_type_col`) ,sum(`integer_type_col`)
,avg(`integer_type_col`)
,sum(`long_type_col`) ,avg(`long_type_col`) ,avg(`decimal_type_col`)

===
My output dataset schema after the groupby.pivot.agg()
 is

 |-- c_salutation: string (nullable = true)
 |-- Y_count(`string_type_col`): long (nullable = true)
 |-- Y_sum(CAST(`string_type_col` AS DOUBLE)): double (nullable = true)
 |-- Y_sum(CAST(`integer_type_col` AS BIGINT)): long (nullable = true)
 |-- Y_avg(CAST(`integer_type_col` AS BIGINT)): double (nullable = true)
 |-- Y_sum(`long_type_col`): long (nullable = true)
 |-- Y_avg(`long_type_col`): double (nullable = true)
 |-- Y_avg(`decimal_type_col`): decimal(38,4) (nullable = true)
 |-- N_count(`string_type_col`): long (nullable = true)
 |-- N_sum(CAST(`string_type_col` AS DOUBLE)): double (nullable = true)
 |-- N_sum(CAST(`integer_type_col` AS BIGINT)): long (nullable = true)
 |-- N_avg(CAST(`integer_type_col` AS BIGINT)): double (nullable = true)
 |-- N_sum(`long_type_col`): long (nullable = true)
 |-- N_avg(`long_type_col`): double (nullable = true)
 |-- N_avg(`decimal_type_col`): decimal(38,4) (nullable = true)
 |-- R_count(`string_type_col`): long (nullable = true)
 |-- R_sum(CAST(`string_type_col` AS DOUBLE)): double (nullable = true)
 |-- R_sum(CAST(`integer_type_col` AS BIGINT)): long (nullable = true)
 |-- R_avg(CAST(`integer_type_col` AS BIGINT)): double (nullable = true)
 |-- R_sum(`long_type_col`): long (nullable = true)
 |-- R_avg(`long_type_col`): double (nullable = true)
 |-- R_avg(`decimal_type_col`): decimal(38,4) (nullable = true)

==
 My requirement is:
 ==
to rename the system generated column names such as
Y_count(`string_type_col`), N_avg(`decimal_type_col`)
 etc to a user defined name based on a mapping. I
I need to be able to do this programatically given a mapping of the form:
(pivotvalue + aggregationfunction) --> (requiredcolumnname)

===
 My question is :
 ===
 Can i rely on the order of the output columns generated?
The order looks to confirm to this pattern
PivotValue1-aggregationfunction1
PivotValue1-aggregationfunction2

PivotValue1-aggregationfunctionN

PivotValue2-aggregationfunction1
PivotValue2-aggregationfunction2
..
 Is this order standard across spark versions 2+ . ?
 Is this subject to change or not reliable from a user point of view. ?

 If not reliable , is there another way by which I can
logically/programatically
  identify that a column such  as R_sum(CAST(`integer_type_col` AS
BIGINT))
 corresponds to the input pivot value  "R" and aggregation function of
sum(`integer_type_col`)


Thanks

Manohar