Hi,

Including query plan :
DataFrame :

== Physical Plan ==
SortBasedAggregate(key=[agreement#23],
functions=[(MaxVectorAggFunction(values#3),mode=Final,isDistinct=false)],
output=[agreement#23,maxvalues#27])
+- ConvertToSafe
   +- Sort [agreement#23 ASC], false, 0
      +- TungstenExchange hashpartitioning(agreement#23,48), None
         +- ConvertToUnsafe
            +- SortBasedAggregate(key=[agreement#23],
functions=[(MaxVectorAggFunction(values#3),mode=Partial,isDistinct=false)],
output=[agreement#23,values#26])
               +- ConvertToSafe
                  +- Sort [agreement#23 ASC], false, 0
                     +- Project [agreement#23,values#3]
                        +- BroadcastHashJoin
[tradeId#0,tradeVersion#1], [tradeId#4,tradeVersion#5], BuildRight
                           :- Scan
ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00000-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00001-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00002-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00003-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet]
PushedFilter: [] [tradeId#0,tradeVersion#1,values#3]
                           +-
TungstenAggregate(key=[tradeId#4,tradeVersion#5,agreement#23],
functions=[], output=[tradeId#4,tradeVersion#5,agreement#23])
                              +- TungstenExchange
hashpartitioning(tradeId#4,tradeVersion#5,agreement#23,48), None
                                 +-
TungstenAggregate(key=[tradeId#4,tradeVersion#5,agreement#23],
functions=[], output=[tradeId#4,tradeVersion#5,agreement#23])
                                    +- Scan ParquetRelation[hdfs://



//1. MapGrouped
== Physical Plan ==
!MapGroups <function2>, class[tradeId[0]: string, tradeVersion[0]:
string, agreement[0]: string], class[_1[0]:
struct<tradeId:string,tradeVersion:string,values:array<double>>,
_2[0]: struct<tradeId:string,tradeVersion:string,agreement:string>],
class[_1[0]: string, _2[0]: string, _3[0]: string, _4[0]:
array<double>], [tradeId#79,tradeVersion#80,agreement#81],
[_1#88,_2#89,_3#90,_4#91]
+- ConvertToSafe
   +- Sort [tradeId#79 ASC,tradeVersion#80 ASC,agreement#81 ASC], false, 0
      +- TungstenExchange
hashpartitioning(tradeId#79,tradeVersion#80,agreement#81,48), None
         +- ConvertToUnsafe
            +- !AppendColumns <function1>, class[_1[0]:
struct<tradeId:string,tradeVersion:string,values:array<double>>,
_2[0]: struct<tradeId:string,tradeVersion:string,agreement:string>],
class[tradeId[0]: string, tradeVersion[0]: string, agreement[0]:
string], [tradeId#79,tradeVersion#80,agreement#81]
               +- Project
[struct(tradeId#38,tradeVersion#39,values#40) AS
_1#73,struct(tradeId#67,tradeVersion#68,agreement#69) AS _2#74]
                  +- BroadcastHashJoin [tradeId#38,tradeVersion#39],
[tradeId#67,tradeVersion#68], BuildRight
                     :- ConvertToUnsafe
                     :  +- !MapPartitions <function1>,
class[tradeId[0]: string, tradeVersion[0]: string, resultType[0]: int,
values[0]: array<double>], class[tradeId[0]: string, tradeVersion[0]:
string, values[0]: array<double>],
[tradeId#38,tradeVersion#39,values#40]
                     :     +- ConvertToSafe
                     :        +- Scan
ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00000-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00001-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00002-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00003-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet]
PushedFilter: [] [tradeId#0,tradeVersion#1,resultType#2,values#3]
                     +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
                        +- TungstenExchange
hashpartitioning(tradeId#67,tradeVersion#68,agreement#69,48), None
                           +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
                              +- !MapPartitions <function1>,
class[tradeId[0]: string, tradeVersion[0]: string, tradeType[0]:
string, notional[0]: decimal(38,18), currency[0]: string, asset[0]:
string, trader[0]: string, productCode[0]: string, counterParty[0]:
string, counterPartyAccronym[0]: string, tradeStatus[0]: string,
portfolio[0]: string, internalPortfolio[0]: string, ptsBook[0]:
string, validFrom[0]: string, validTill[0]: string, tradeDate[0]:
string, maturity[0]: string, buySellIndicator[0]: string,
agreement[0]: string], class[tradeId[0]: string, tradeVersion[0]:
string, agreement[0]: string],
[tradeId#67,tradeVersion#68,agreement#69]
                                 +- ConvertToSafe
                                    +- Scan ParquetRelation[hdfs://na...


//2. Reduce

== Physical Plan ==
!MapPartitions <function1>, class[_1[0]:
struct<tradeId:string,tradeVersion:string,agreement:string>, _2[0]:
struct<_1:struct<tradeId:string,tradeVersion:string,values:array<double>>,_2:struct<tradeId:string,tradeVersion:string,agreement:string>>],
class[_1[0]: string, _2[0]: string, _3[0]: string, _4[0]:
array<double>], [_1#110,_2#111,_3#112,_4#113]
+- !MapGroups <function2>, class[tradeId[0]: string, tradeVersion[0]:
string, agreement[0]: string], class[_1[0]:
struct<tradeId:string,tradeVersion:string,values:array<double>>,
_2[0]: struct<tradeId:string,tradeVersion:string,agreement:string>],
class[_1[0]: struct<tradeId:string,tradeVersion:string,agreement:string>,
_2[0]: 
struct<_1:struct<tradeId:string,tradeVersion:string,values:array<double>>,_2:struct<tradeId:string,tradeVersion:string,agreement:string>>],
[tradeId#98,tradeVersion#99,agreement#100], [_1#103,_2#104]
   +- ConvertToSafe
      +- Sort [tradeId#98 ASC,tradeVersion#99 ASC,agreement#100 ASC], false, 0
         +- TungstenExchange
hashpartitioning(tradeId#98,tradeVersion#99,agreement#100,48), None
            +- ConvertToUnsafe
               +- !AppendColumns <function1>, class[_1[0]:
struct<tradeId:string,tradeVersion:string,values:array<double>>,
_2[0]: struct<tradeId:string,tradeVersion:string,agreement:string>],
class[tradeId[0]: string, tradeVersion[0]: string, agreement[0]:
string], [tradeId#98,tradeVersion#99,agreement#100]
                  +- Project
[struct(tradeId#38,tradeVersion#39,values#40) AS
_1#73,struct(tradeId#67,tradeVersion#68,agreement#69) AS _2#74]
                     +- BroadcastHashJoin
[tradeId#38,tradeVersion#39], [tradeId#67,tradeVersion#68], BuildRight
                        :- ConvertToUnsafe
                        :  +- !MapPartitions <function1>,
class[tradeId[0]: string, tradeVersion[0]: string, resultType[0]: int,
values[0]: array<double>], class[tradeId[0]: string, tradeVersion[0]:
string, values[0]: array<double>],
[tradeId#38,tradeVersion#39,values#40]
                        :     +- ConvertToSafe
                        :        +- Scan
ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00000-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00001-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00002-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00003-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet]
PushedFilter: [] [tradeId#0,tradeVersion#1,resultType#2,values#3]
                        +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
                           +- TungstenExchange
hashpartitioning(tradeId#67,tradeVersion#68,agreement#69,48), None
                              +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
                                 +- !MapPartitions <function1>,
class[tradeId[0]: string, tradeVersion[0]: string, tradeType[0]:
string, notional[0]: decimal(38,18), currency[0]: string, asset[0]:
string, trader[0]: string, productCode[0]: string, counterParty[0]:
string, counterPartyAccronym[0]: string, tradeStatus[0]: string,
portfolio[0]: string, internalPortfolio[0]: string, ptsBook[0]:
string, validFrom[0]: string, validTill[0]: string, tradeDate[0]:
string, maturity[0]: string, buySellIndicator[0]: string,
agreement[0]: string], class[tradeId[0]: string, tradeVersion[0]:
string, agreement[0]: string],
[tradeId#67,tradeVersion#68,agreement#69]
                                    +- ConvertToSafe
                                       +- Scan ParquetRelation[hdfs://n....


//3. Cogroup

== Physical Plan ==
!CoGroup <function3>, class[_1[0]: string, _2[0]: string],
class[tradeId[0]: string, tradeVersion[0]: string, values[0]:
array<double>], class[tradeId[0]: string, tradeVersion[0]: string,
agreement[0]: string], class[_1[0]: string, _2[0]: string, _3[0]:
string, _4[0]: array<double>], [_1#133,_2#134,_3#135,_4#136],
[_1#119,_2#120], [_1#125,_2#126]
:- ConvertToSafe
:  +- Sort [_1#119 ASC,_2#120 ASC], false, 0
:     +- TungstenExchange hashpartitioning(_1#119,_2#120,48), None
:        +- ConvertToUnsafe
:           +- !AppendColumns <function1>, class[tradeId[0]: string,
tradeVersion[0]: string, values[0]: array<double>], class[_1[0]:
string, _2[0]: string], [_1#119,_2#120]
:              +- ConvertToUnsafe
:                 +- !MapPartitions <function1>, class[tradeId[0]:
string, tradeVersion[0]: string, resultType[0]: int, values[0]:
array<double>], class[tradeId[0]: string, tradeVersion[0]: string,
values[0]: array<double>], [tradeId#38,tradeVersion#39,values#40]
:                    +- ConvertToSafe
:                       +- Scan
ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00000-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00001-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00002-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00003-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet]
PushedFilter: [] [tradeId#0,tradeVersion#1,resultType#2,values#3]
+- ConvertToSafe
   +- Sort [_1#125 ASC,_2#126 ASC], false, 0
      +- TungstenExchange hashpartitioning(_1#125,_2#126,48), None
         +- ConvertToUnsafe
            +- !AppendColumns <function1>, class[tradeId[0]: string,
tradeVersion[0]: string, agreement[0]: string], class[_1[0]: string,
_2[0]: string], [_1#125,_2#126]
               +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
                  +- TungstenExchange
hashpartitioning(tradeId#67,tradeVersion#68,agreement#69,48), None
                     +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
                        +- !MapPartitions <function1>,
class[tradeId[0]: string, tradeVersion[0]: string, tradeType[0]:
string, notional[0]: decimal(38,18), currency[0]: string, asset[0]:
string, trader[0]: string, productCode[0]: string, counterParty[0]:
string, counterPartyAccronym[0]: string, tradeStatus[0]: string,
portfolio[0]: string, internalPortfolio[0]: string, ptsBook[0]:
string, validFrom[0]: string, validTill[0]: string, tradeDate[0]:
string, maturity[0]: string, buySellIndicator[0]: string,
agreement[0]: string], class[tradeId[0]: string, tradeVersion[0]:
string, agreement[0]: string],
[tradeId#67,tradeVersion#68,agreement#69]
                           +- ConvertToSafe
                              +- Scan ParquetRelation[hdfs://na ...]
PushedFilter: []
[tradeId#4,tradeVersion#5,tradeType#6,notional#7L,currency#8,asset#9,trader#10,productCode#11,counterParty#12,counterPartyAccronym#13,tradeStatus#14,portfolio#15,internalPortfolio#16,ptsBook#17,validFrom#18,validTill#19,tradeDate#20,maturity#21,buySellIndicator#22,agreement#23,date#24]

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to