Hi, I have done some performance tests by repeating execution with different number of executors and memory for YARN clustered Spark (version 1.6.0) ( cluster contains 6 large size nodes)
I found Dataset joinWith or cogroup from 3 to 5 times slower then broadcast join in DataFrame, how to make it at least similar fast ? Examples of my code : DataFrame : // 500 milion rows val r = results.select("tradeId", "tradeVersion", "values").as("r") // 100 thousand rows val t = trades.select("tradeId", "tradeVersion", "agreement").distinct.as("t") val j = r.join(broadcast(t), r("tradeId") === t("tradeId") && r("tradeVersion") === t("tradeVersion")) val s = j.select(r("tradeId"), t("tradeVersion"), t("agreement"), r("values")) val g = s.groupBy(t("agreement")) val maxvec = new MaxVectorAggFunction val agg = g.agg(maxvec(r("values")).as("maxvalues")) agg.write.parquet("hdfs:.../tmp/somelocation") DataSet case class ResultsA(tradeId: String, tradeVersion: String, resultType: Int, values: Array[Double]) case class TradesA(tradeId: String, tradeVersion: String, tradeType: String, notional: BigDecimal, currency: String, asset: String, trader: String, productCode: String, counterParty: String, counterPartyAccronym: String, tradeStatus: String, portfolio: String, internalPortfolio: String, ptsBook: String, validFrom: String, validTill: String, tradeDate: String, maturity: String, buySellIndicator: String, agreement: String) case class ResultSmallA(tradeId: String, tradeVersion: String, values: Array[Double]) case class ResultAgreementA(tradeId: String, tradeVersion: String, agreement: String, values: Array[Double]) case class TradeSmallA(tradeId: String, tradeVersion: String, agreement: String) lazy val dsresults = results.as[ResultsA].map(r => ResultSmallA(r.tradeId, r.tradeVersion, r.values)).as("r") lazy val dstrades = trades.as[TradesA].map(t => TradeSmallA(t.tradeId, t.tradeVersion, t.agreement)).distinct.as("t") lazy val j = dsresults.joinWith(dstrades, $"r.tradeId" === $"t.tradeId" && $"r.tradeVersion" === $"t.tradeVersion", "inner") //1. MapGrouped val group = j.groupBy { v => v match { case (r: ResultSmallA, t: TradeSmallA) => t } } val reduced = group.mapGroups { case (t, iter) => (t.tradeId, t.tradeVersion, t.agreement, iter.map { case (r, t) => r.values }.reduce((l, r) => { val min = new MinVectorAggFunction(); min.mergeArrays(l, r) })) } //2. Reduce val group2 = j.groupBy(_._2) val reduced2 = group2.reduce((i1, i2) => { val r1 = i1._1 val r2 = i2._1 import r1._ val min = new MinVectorAggFunction(); (ResultSmallA(tradeId, tradeVersion, min.mergeArrays(values, r2.values)), i1._2) }) val reduced = reduced2.map { case (t, (r, _)) => (r.tradeId, r.tradeVersion, t.agreement, r.values) } //3. Cogroup val cogrouped1 = dsresults.groupBy(r => (r.tradeId, r.tradeVersion)).cogroup(dstrades.groupBy(t => (t.tradeId, t.tradeVersion))) { case (key, data1, data2) => if (data2.isEmpty || data1.isEmpty) Iterator() else { val t = data2.next() val min = new MinVectorAggFunction() Iterator((t.tradeId, t.tradeVersion, t.agreement, data1.map(_.values).reduce(min.mergeArrays))) } } // MinVectorAggFunction just merge two array of Double --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org