Re: Spark joins using row id

2016-11-13 Thread Yan Facai
pairRDD can use (hash) partition information to do some optimizations when
joined, while I am not sure if dataset could.

On Sat, Nov 12, 2016 at 7:11 PM, Rohit Verma 
wrote:

> For datasets structured as
>
> ds1
> rowN col1
> 1   A
> 2   B
> 3   C
> 4   C
> …
>
> and
>
> ds2
> rowN col2
> 1   X
> 2   Y
> 3   Z
> …
>
> I want to do a left join
>
> Dataset joined = ds1.join(ds2,”rowN”,”left outer”);
>
> I somewhere read in SO or this mailing list that if spark is aware of
> datasets being sorted it will use some optimizations for joins.
> Is it possible to make this join more efficient/faster.
>
> Rohit


Re: Spark joins using row id

2016-11-12 Thread Rohit Verma
Result of explain is as follows

*BroadcastHashJoin [rowN#0], [rowN#39], LeftOuter, BuildRight
:- *Project [rowN#0, informer_code#22]
:  +- Window [rownumber() windowspecdefinition(informer_code#22 ASC, ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rowN#0], [informer_code#22 ASC]
: +- *Sort [informer_code#22 ASC], false, 0
:+- Exchange SinglePartition
:   +- *HashAggregate(keys=[informer_code#22], functions=[])
:  +- Exchange hashpartitioning(informer_code#22, 200)
: +- *HashAggregate(keys=[informer_code#22], functions=[])
:+- *BatchedScan parquet [INFORMER_CODE#22] Format: 
ParquetFormat, InputPaths: 
hdfs://192.168.0.102:8020/user/rohit/data/5/78/ORCL.CRA.CUSTOMERS.parquet, 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] 
as bigint)))
   +- *Project [rowN#39, customer_type#64]
  +- Window [rownumber() windowspecdefinition(customer_type#64 ASC, ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rowN#39], [customer_type#64 ASC]
 +- *Sort [customer_type#64 ASC], false, 0
+- Exchange SinglePartition
   +- *HashAggregate(keys=[customer_type#64], functions=[])
  +- Exchange hashpartitioning(customer_type#64, 200)
 +- *HashAggregate(keys=[customer_type#64], functions=[])
+- *BatchedScan parquet [CUSTOMER_TYPE#64] Format: 
ParquetFormat, InputPaths: 
hdfs://192.168.0.102:8020/user/rohit/data/5/78/ORCL.CRA.CUSTOMERS.parquet, 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct

I believe this isn’t the intended behavior.

Rohit
On Nov 12, 2016, at 6:15 PM, Stuart White 
mailto:stuart.whi...@gmail.com>> wrote:

The Spark Catalyst Optimizer is responsible for determining what steps Spark 
needs to execute to satisfy your query.  Given what it knows about your 
datasets, it attempts to choose the most optimal set of steps.  On any dataset 
you can use the .explain() method to print out the steps that Spark will 
execute to satisfy your query.

This site explains how all this works:

http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/


On Sat, Nov 12, 2016 at 5:11 AM, Rohit Verma 
mailto:rohit.ve...@rokittech.com>> wrote:
For datasets structured as

ds1
rowN col1
1   A
2   B
3   C
4   C
…

and

ds2
rowN col2
1   X
2   Y
3   Z
…

I want to do a left join

Dataset joined = ds1.join(ds2,”rowN”,”left outer”);

I somewhere read in SO or this mailing list that if spark is aware of datasets 
being sorted it will use some optimizations for joins.
Is it possible to make this join more efficient/faster.

Rohit




Spark joins using row id

2016-11-12 Thread Rohit Verma
For datasets structured as 

ds1
rowN col1
1   A
2   B
3   C
4   C
…

and

ds2
rowN col2
1   X
2   Y
3   Z
…

I want to do a left join 

Dataset joined = ds1.join(ds2,”rowN”,”left outer”);

I somewhere read in SO or this mailing list that if spark is aware of datasets 
being sorted it will use some optimizations for joins.
Is it possible to make this join more efficient/faster.

Rohit