The focus of this release was to get the API out there and there's a lot of
low hanging performance optimizations.  That said, there is likely always
going to be some cost of materializing objects.

Another note, anytime your comparing performance its useful to include the
output of explain so we can sanity check the chosen query plan.

On Wed, Jan 13, 2016 at 6:39 AM, Arkadiusz Bicz <arkadiusz.b...@gmail.com>
wrote:

> Hi,
>
> I have done some performance tests by repeating execution with
> different number of  executors and memory for YARN  clustered Spark
> (version 1.6.0)  ( cluster contains 6 large size nodes)
>
> I found Dataset joinWith or cogroup from 3 to 5 times slower then
> broadcast join in DataFrame, how to make it at least similar fast ?
>
> Examples of my code :
>
> DataFrame :
> // 500 milion rows
> val r = results.select("tradeId", "tradeVersion", "values").as("r")
> // 100 thousand rows
> val t = trades.select("tradeId", "tradeVersion", "agreement").distinct.as
> ("t")
>
> val j = r.join(broadcast(t), r("tradeId") === t("tradeId") &&
> r("tradeVersion") === t("tradeVersion"))
> val s = j.select(r("tradeId"), t("tradeVersion"), t("agreement"),
> r("values"))
> val g = s.groupBy(t("agreement"))
>
> val maxvec = new MaxVectorAggFunction
> val agg = g.agg(maxvec(r("values")).as("maxvalues"))
> agg.write.parquet("hdfs:.../tmp/somelocation")
>
> DataSet
>
> case class ResultsA(tradeId: String, tradeVersion: String, resultType:
> Int, values: Array[Double])
>
> case class TradesA(tradeId: String, tradeVersion: String, tradeType:
> String, notional: BigDecimal, currency: String,
>                       asset: String, trader: String, productCode:
> String, counterParty: String, counterPartyAccronym: String,
>                       tradeStatus: String, portfolio: String,
> internalPortfolio: String, ptsBook: String, validFrom: String,
>                       validTill: String, tradeDate: String, maturity:
> String, buySellIndicator: String, agreement: String)
>
> case class ResultSmallA(tradeId: String, tradeVersion: String, values:
> Array[Double])
> case class ResultAgreementA(tradeId: String, tradeVersion: String,
> agreement: String, values: Array[Double])
> case class TradeSmallA(tradeId: String, tradeVersion: String, agreement:
> String)
>
> lazy val dsresults = results.as[ResultsA].map(r =>
> ResultSmallA(r.tradeId, r.tradeVersion, r.values)).as("r")
> lazy val dstrades = trades.as[TradesA].map(t => TradeSmallA(t.tradeId,
> t.tradeVersion, t.agreement)).distinct.as("t")
> lazy val j = dsresults.joinWith(dstrades, $"r.tradeId" ===
> $"t.tradeId" && $"r.tradeVersion" === $"t.tradeVersion", "inner")
>
> //1. MapGrouped
>
> val group = j.groupBy { v => v match {
>     case (r: ResultSmallA, t: TradeSmallA) => t
>   }
> }
>
> val reduced = group.mapGroups { case (t, iter) => (t.tradeId,
> t.tradeVersion, t.agreement,
>   iter.map { case (r, t) => r.values }.reduce((l, r) => {
>     val min = new MinVectorAggFunction(); min.mergeArrays(l, r)
>   }))
> }
>
> //2. Reduce
>
> val group2 = j.groupBy(_._2)
>
> val reduced2 = group2.reduce((i1, i2) => {
>   val r1 = i1._1
>   val r2 = i2._1
>   import r1._
>   val min = new MinVectorAggFunction();
>   (ResultSmallA(tradeId, tradeVersion, min.mergeArrays(values,
> r2.values)), i1._2)
> })
>
> val reduced = reduced2.map { case (t, (r, _)) => (r.tradeId,
> r.tradeVersion, t.agreement, r.values) }
>
>
> //3. Cogroup
>
> val cogrouped1 = dsresults.groupBy(r => (r.tradeId,
> r.tradeVersion)).cogroup(dstrades.groupBy(t => (t.tradeId,
> t.tradeVersion))) {
>   case (key, data1, data2) =>
>     if (data2.isEmpty || data1.isEmpty) Iterator()
>     else {
>       val t = data2.next()
>       val min = new MinVectorAggFunction()
>       Iterator((t.tradeId, t.tradeVersion, t.agreement,
> data1.map(_.values).reduce(min.mergeArrays)))
>     }
> }
>
> // MinVectorAggFunction just merge two array of Double
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to