Hi,

I have done some performance tests by repeating execution with
different number of  executors and memory for YARN  clustered Spark
(version 1.6.0)  ( cluster contains 6 large size nodes)

I found Dataset joinWith or cogroup from 3 to 5 times slower then
broadcast join in DataFrame, how to make it at least similar fast ?

Examples of my code :

DataFrame :
// 500 milion rows
val r = results.select("tradeId", "tradeVersion", "values").as("r")
// 100 thousand rows
val t = trades.select("tradeId", "tradeVersion", "agreement").distinct.as("t")

val j = r.join(broadcast(t), r("tradeId") === t("tradeId") &&
r("tradeVersion") === t("tradeVersion"))
val s = j.select(r("tradeId"), t("tradeVersion"), t("agreement"), r("values"))
val g = s.groupBy(t("agreement"))

val maxvec = new MaxVectorAggFunction
val agg = g.agg(maxvec(r("values")).as("maxvalues"))
agg.write.parquet("hdfs:.../tmp/somelocation")

DataSet

case class ResultsA(tradeId: String, tradeVersion: String, resultType:
Int, values: Array[Double])

case class TradesA(tradeId: String, tradeVersion: String, tradeType:
String, notional: BigDecimal, currency: String,
                      asset: String, trader: String, productCode:
String, counterParty: String, counterPartyAccronym: String,
                      tradeStatus: String, portfolio: String,
internalPortfolio: String, ptsBook: String, validFrom: String,
                      validTill: String, tradeDate: String, maturity:
String, buySellIndicator: String, agreement: String)

case class ResultSmallA(tradeId: String, tradeVersion: String, values:
Array[Double])
case class ResultAgreementA(tradeId: String, tradeVersion: String,
agreement: String, values: Array[Double])
case class TradeSmallA(tradeId: String, tradeVersion: String, agreement: String)

lazy val dsresults = results.as[ResultsA].map(r =>
ResultSmallA(r.tradeId, r.tradeVersion, r.values)).as("r")
lazy val dstrades = trades.as[TradesA].map(t => TradeSmallA(t.tradeId,
t.tradeVersion, t.agreement)).distinct.as("t")
lazy val j = dsresults.joinWith(dstrades, $"r.tradeId" ===
$"t.tradeId" && $"r.tradeVersion" === $"t.tradeVersion", "inner")

//1. MapGrouped

val group = j.groupBy { v => v match {
    case (r: ResultSmallA, t: TradeSmallA) => t
  }
}

val reduced = group.mapGroups { case (t, iter) => (t.tradeId,
t.tradeVersion, t.agreement,
  iter.map { case (r, t) => r.values }.reduce((l, r) => {
    val min = new MinVectorAggFunction(); min.mergeArrays(l, r)
  }))
}

//2. Reduce

val group2 = j.groupBy(_._2)

val reduced2 = group2.reduce((i1, i2) => {
  val r1 = i1._1
  val r2 = i2._1
  import r1._
  val min = new MinVectorAggFunction();
  (ResultSmallA(tradeId, tradeVersion, min.mergeArrays(values,
r2.values)), i1._2)
})

val reduced = reduced2.map { case (t, (r, _)) => (r.tradeId,
r.tradeVersion, t.agreement, r.values) }


//3. Cogroup

val cogrouped1 = dsresults.groupBy(r => (r.tradeId,
r.tradeVersion)).cogroup(dstrades.groupBy(t => (t.tradeId,
t.tradeVersion))) {
  case (key, data1, data2) =>
    if (data2.isEmpty || data1.isEmpty) Iterator()
    else {
      val t = data2.next()
      val min = new MinVectorAggFunction()
      Iterator((t.tradeId, t.tradeVersion, t.agreement,
data1.map(_.values).reduce(min.mergeArrays)))
    }
}

// MinVectorAggFunction just merge two array of Double

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to