Hi, Including query plan : DataFrame :
== Physical Plan == SortBasedAggregate(key=[agreement#23], functions=[(MaxVectorAggFunction(values#3),mode=Final,isDistinct=false)], output=[agreement#23,maxvalues#27]) +- ConvertToSafe +- Sort [agreement#23 ASC], false, 0 +- TungstenExchange hashpartitioning(agreement#23,48), None +- ConvertToUnsafe +- SortBasedAggregate(key=[agreement#23], functions=[(MaxVectorAggFunction(values#3),mode=Partial,isDistinct=false)], output=[agreement#23,values#26]) +- ConvertToSafe +- Sort [agreement#23 ASC], false, 0 +- Project [agreement#23,values#3] +- BroadcastHashJoin [tradeId#0,tradeVersion#1], [tradeId#4,tradeVersion#5], BuildRight :- Scan ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00000-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00001-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00002-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00003-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet] PushedFilter: [] [tradeId#0,tradeVersion#1,values#3] +- TungstenAggregate(key=[tradeId#4,tradeVersion#5,agreement#23], functions=[], output=[tradeId#4,tradeVersion#5,agreement#23]) +- TungstenExchange hashpartitioning(tradeId#4,tradeVersion#5,agreement#23,48), None +- TungstenAggregate(key=[tradeId#4,tradeVersion#5,agreement#23], functions=[], output=[tradeId#4,tradeVersion#5,agreement#23]) +- Scan ParquetRelation[hdfs:// //1. MapGrouped == Physical Plan == !MapGroups <function2>, class[tradeId[0]: string, tradeVersion[0]: string, agreement[0]: string], class[_1[0]: struct<tradeId:string,tradeVersion:string,values:array<double>>, _2[0]: struct<tradeId:string,tradeVersion:string,agreement:string>], class[_1[0]: string, _2[0]: string, _3[0]: string, _4[0]: array<double>], [tradeId#79,tradeVersion#80,agreement#81], [_1#88,_2#89,_3#90,_4#91] +- ConvertToSafe +- Sort [tradeId#79 ASC,tradeVersion#80 ASC,agreement#81 ASC], false, 0 +- TungstenExchange hashpartitioning(tradeId#79,tradeVersion#80,agreement#81,48), None +- ConvertToUnsafe +- !AppendColumns <function1>, class[_1[0]: struct<tradeId:string,tradeVersion:string,values:array<double>>, _2[0]: struct<tradeId:string,tradeVersion:string,agreement:string>], class[tradeId[0]: string, tradeVersion[0]: string, agreement[0]: string], [tradeId#79,tradeVersion#80,agreement#81] +- Project [struct(tradeId#38,tradeVersion#39,values#40) AS _1#73,struct(tradeId#67,tradeVersion#68,agreement#69) AS _2#74] +- BroadcastHashJoin [tradeId#38,tradeVersion#39], [tradeId#67,tradeVersion#68], BuildRight :- ConvertToUnsafe : +- !MapPartitions <function1>, class[tradeId[0]: string, tradeVersion[0]: string, resultType[0]: int, values[0]: array<double>], class[tradeId[0]: string, tradeVersion[0]: string, values[0]: array<double>], [tradeId#38,tradeVersion#39,values#40] : +- ConvertToSafe : +- Scan ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00000-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00001-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00002-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00003-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet] PushedFilter: [] [tradeId#0,tradeVersion#1,resultType#2,values#3] +- TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69], functions=[], output=[tradeId#67,tradeVersion#68,agreement#69]) +- TungstenExchange hashpartitioning(tradeId#67,tradeVersion#68,agreement#69,48), None +- TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69], functions=[], output=[tradeId#67,tradeVersion#68,agreement#69]) +- !MapPartitions <function1>, class[tradeId[0]: string, tradeVersion[0]: string, tradeType[0]: string, notional[0]: decimal(38,18), currency[0]: string, asset[0]: string, trader[0]: string, productCode[0]: string, counterParty[0]: string, counterPartyAccronym[0]: string, tradeStatus[0]: string, portfolio[0]: string, internalPortfolio[0]: string, ptsBook[0]: string, validFrom[0]: string, validTill[0]: string, tradeDate[0]: string, maturity[0]: string, buySellIndicator[0]: string, agreement[0]: string], class[tradeId[0]: string, tradeVersion[0]: string, agreement[0]: string], [tradeId#67,tradeVersion#68,agreement#69] +- ConvertToSafe +- Scan ParquetRelation[hdfs://na... //2. Reduce == Physical Plan == !MapPartitions <function1>, class[_1[0]: struct<tradeId:string,tradeVersion:string,agreement:string>, _2[0]: struct<_1:struct<tradeId:string,tradeVersion:string,values:array<double>>,_2:struct<tradeId:string,tradeVersion:string,agreement:string>>], class[_1[0]: string, _2[0]: string, _3[0]: string, _4[0]: array<double>], [_1#110,_2#111,_3#112,_4#113] +- !MapGroups <function2>, class[tradeId[0]: string, tradeVersion[0]: string, agreement[0]: string], class[_1[0]: struct<tradeId:string,tradeVersion:string,values:array<double>>, _2[0]: struct<tradeId:string,tradeVersion:string,agreement:string>], class[_1[0]: struct<tradeId:string,tradeVersion:string,agreement:string>, _2[0]: struct<_1:struct<tradeId:string,tradeVersion:string,values:array<double>>,_2:struct<tradeId:string,tradeVersion:string,agreement:string>>], [tradeId#98,tradeVersion#99,agreement#100], [_1#103,_2#104] +- ConvertToSafe +- Sort [tradeId#98 ASC,tradeVersion#99 ASC,agreement#100 ASC], false, 0 +- TungstenExchange hashpartitioning(tradeId#98,tradeVersion#99,agreement#100,48), None +- ConvertToUnsafe +- !AppendColumns <function1>, class[_1[0]: struct<tradeId:string,tradeVersion:string,values:array<double>>, _2[0]: struct<tradeId:string,tradeVersion:string,agreement:string>], class[tradeId[0]: string, tradeVersion[0]: string, agreement[0]: string], [tradeId#98,tradeVersion#99,agreement#100] +- Project [struct(tradeId#38,tradeVersion#39,values#40) AS _1#73,struct(tradeId#67,tradeVersion#68,agreement#69) AS _2#74] +- BroadcastHashJoin [tradeId#38,tradeVersion#39], [tradeId#67,tradeVersion#68], BuildRight :- ConvertToUnsafe : +- !MapPartitions <function1>, class[tradeId[0]: string, tradeVersion[0]: string, resultType[0]: int, values[0]: array<double>], class[tradeId[0]: string, tradeVersion[0]: string, values[0]: array<double>], [tradeId#38,tradeVersion#39,values#40] : +- ConvertToSafe : +- Scan ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00000-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00001-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00002-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00003-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet] PushedFilter: [] [tradeId#0,tradeVersion#1,resultType#2,values#3] +- TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69], functions=[], output=[tradeId#67,tradeVersion#68,agreement#69]) +- TungstenExchange hashpartitioning(tradeId#67,tradeVersion#68,agreement#69,48), None +- TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69], functions=[], output=[tradeId#67,tradeVersion#68,agreement#69]) +- !MapPartitions <function1>, class[tradeId[0]: string, tradeVersion[0]: string, tradeType[0]: string, notional[0]: decimal(38,18), currency[0]: string, asset[0]: string, trader[0]: string, productCode[0]: string, counterParty[0]: string, counterPartyAccronym[0]: string, tradeStatus[0]: string, portfolio[0]: string, internalPortfolio[0]: string, ptsBook[0]: string, validFrom[0]: string, validTill[0]: string, tradeDate[0]: string, maturity[0]: string, buySellIndicator[0]: string, agreement[0]: string], class[tradeId[0]: string, tradeVersion[0]: string, agreement[0]: string], [tradeId#67,tradeVersion#68,agreement#69] +- ConvertToSafe +- Scan ParquetRelation[hdfs://n.... //3. Cogroup == Physical Plan == !CoGroup <function3>, class[_1[0]: string, _2[0]: string], class[tradeId[0]: string, tradeVersion[0]: string, values[0]: array<double>], class[tradeId[0]: string, tradeVersion[0]: string, agreement[0]: string], class[_1[0]: string, _2[0]: string, _3[0]: string, _4[0]: array<double>], [_1#133,_2#134,_3#135,_4#136], [_1#119,_2#120], [_1#125,_2#126] :- ConvertToSafe : +- Sort [_1#119 ASC,_2#120 ASC], false, 0 : +- TungstenExchange hashpartitioning(_1#119,_2#120,48), None : +- ConvertToUnsafe : +- !AppendColumns <function1>, class[tradeId[0]: string, tradeVersion[0]: string, values[0]: array<double>], class[_1[0]: string, _2[0]: string], [_1#119,_2#120] : +- ConvertToUnsafe : +- !MapPartitions <function1>, class[tradeId[0]: string, tradeVersion[0]: string, resultType[0]: int, values[0]: array<double>], class[tradeId[0]: string, tradeVersion[0]: string, values[0]: array<double>], [tradeId#38,tradeVersion#39,values#40] : +- ConvertToSafe : +- Scan ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00000-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00001-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00002-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00003-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet] PushedFilter: [] [tradeId#0,tradeVersion#1,resultType#2,values#3] +- ConvertToSafe +- Sort [_1#125 ASC,_2#126 ASC], false, 0 +- TungstenExchange hashpartitioning(_1#125,_2#126,48), None +- ConvertToUnsafe +- !AppendColumns <function1>, class[tradeId[0]: string, tradeVersion[0]: string, agreement[0]: string], class[_1[0]: string, _2[0]: string], [_1#125,_2#126] +- TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69], functions=[], output=[tradeId#67,tradeVersion#68,agreement#69]) +- TungstenExchange hashpartitioning(tradeId#67,tradeVersion#68,agreement#69,48), None +- TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69], functions=[], output=[tradeId#67,tradeVersion#68,agreement#69]) +- !MapPartitions <function1>, class[tradeId[0]: string, tradeVersion[0]: string, tradeType[0]: string, notional[0]: decimal(38,18), currency[0]: string, asset[0]: string, trader[0]: string, productCode[0]: string, counterParty[0]: string, counterPartyAccronym[0]: string, tradeStatus[0]: string, portfolio[0]: string, internalPortfolio[0]: string, ptsBook[0]: string, validFrom[0]: string, validTill[0]: string, tradeDate[0]: string, maturity[0]: string, buySellIndicator[0]: string, agreement[0]: string], class[tradeId[0]: string, tradeVersion[0]: string, agreement[0]: string], [tradeId#67,tradeVersion#68,agreement#69] +- ConvertToSafe +- Scan ParquetRelation[hdfs://na ...] PushedFilter: [] [tradeId#4,tradeVersion#5,tradeType#6,notional#7L,currency#8,asset#9,trader#10,productCode#11,counterParty#12,counterPartyAccronym#13,tradeStatus#14,portfolio#15,internalPortfolio#16,ptsBook#17,validFrom#18,validTill#19,tradeDate#20,maturity#21,buySellIndicator#22,agreement#23,date#24] --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org