Github user viirya commented on the issue: https://github.com/apache/spark/pull/21498 Benchmarking on a Spark cluster with 5 nodes on EC2 too. ```scala def benchmark(func: () => Unit): Unit = { val t0 = System.nanoTime() func() val t1 = System.nanoTime() println("Elapsed time: " + (t1 - t0) + "ns") } val N = 10000 val data = (0 until N).map { i => (i, i % 2, i % 3, Array.fill(10)(i), Array.fill(10)(i.toString), Array.fill(10)(i.toDouble), (i, i.toString, i.toDouble)) } val df1 = sc.parallelize(data).toDF("key", "t1", "t2", "t3", "t4", "t5", "t6").repartition($"key") val df2 = sc.parallelize(data).toDF("key", "t1", "t2", "t3", "t4", "t5", "t6").repartition($"key") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) spark.conf.set("spark.sql.unionInSamePartition", "true") val df3 = df1.union(df2).groupBy("key").agg(count("*")) val df4 = df1.union(df2) val df5 = df3.sample(0.8).filter($"key" > 1000000).sample(0.4) val df6 = df4.sample(0.8).filter($"key" > 1000000).sample(0.4) benchmark(() => df3.collect) benchmark(() => df4.collect) benchmark(() => df5.collect) benchmark(() => df6.collect) ``` Before: ```scala scala> benchmark(() => df3.collect) Elapsed time: 663668585ns scala> benchmark(() => df4.collect) Elapsed time: 547487953ns scala> benchmark(() => df5.collect) Elapsed time: 712634187ns scala> benchmark(() => df6.collect) Elapsed time: 491917400ns ``` After: ```scala scala> benchmark(() => df3.collect) Elapsed time: 516797788ns scala> benchmark(() => df4.collect) Elapsed time: 557499803ns scala> benchmark(() => df5.collect) Elapsed time: 611327782ns scala> benchmark(() => df6.collect) Elapsed time: 495387557ns ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org