[jira] [Updated] (SPARK-33383) Improve performance of Column.isin Expression

2020-11-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/SPARK-33383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Felix Wollschläger updated SPARK-33383:
---
Description: 
When I asked [a question on 
Stackoverflow|https://stackoverflow.com/questions/64683189/usage-of-broadcast-variables-when-using-only-spark-sql-api]
 and running some local tests, I came across a performance bottleneck when 
using the _where_-Condition _Column.isin_.


I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
_Column.isin_ Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
{code}
df.filter(row => allowedValues.contains(row.getInt(0)))
{code}


however, when running a few tests locally, I realized that using _Column.isin_ 
is actually about 10 times slower than a _rdd.filter_ or a broadcast-inner-join.

Shouldn't {code}df.where(col("colname").isin(allowedValues)){code} perform 
(SQL-API overhead aside) as good as {code}df.filter(row => 
allowedValues.contains(row.getInt(0))){code} ?

I used the following dummy code for my local tests:

{code:scala}
package example

import org.apache.spark.sql.functions.{broadcast, col, count}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.util.Random

object Test {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Name")
.master("local[*]")
.config("spark.driver.host", "localhost")
.config("spark.ui.enabled", "false")
.getOrCreate()

import spark.implicits._

val _10Million = 1000
val random = new Random(1048394789305L)

val values = Seq.fill(_10Million)(random.nextInt())
val df = values.toDF("value")
val allowedValues = getRandomElements(values, random, 1)

println("Starting ...")
runWithInCollection(spark, df, allowedValues)
println(" In Collection")
runWithBroadcastDF(spark, df, allowedValues)
println(" Broadcast DF")
runWithBroadcastVariable(spark, df, allowedValues)
println(" Broadcast Variable")
}

def getRandomElements[A](seq: Seq[A], random: Random, size: Int): Set[A] = {
val builder = Set.newBuilder[A]

for (i <- 0 until size) {
builder += getRandomElement(seq, random)
}

builder.result()
}

def getRandomElement[A](seq: Seq[A], random: Random): A = {
seq(random.nextInt(seq.length))
}

// I expected this one to be almost equivalent to the one with a 
broadcast-variable, but it's actually about 10 times slower
def runWithInCollection(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
spark.time {

df.where(col("value").isInCollection(allowedValues)).runTestAggregation()
}
}

// A bit slower than the one with a broadcast variable
def runWithBroadcastDF(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
import spark.implicits._

val allowedValuesDF = allowedValues.toSeq.toDF("allowedValue")

spark.time {
df.join(broadcast(allowedValuesDF), col("value") === 
col("allowedValue")).runTestAggregation()
}
}

// This is actually the fastest one
def runWithBroadcastVariable(spark: SparkSession, df: DataFrame, 
allowedValues: Set[Int]): Unit = {
val allowedValuesBroadcast = spark.sparkContext.broadcast(allowedValues)

spark.time {
df.filter(row => 
allowedValuesBroadcast.value.contains(row.getInt(0))).runTestAggregation()
}
}

implicit class TestRunner(val df: DataFrame) {

def runTestAggregation(): Unit = {
df.agg(count("value")).show()
}
}
}
{code}

  was:
When I asked [a question on 
Stackoverflow|https://stackoverflow.com/questions/64683189/usage-of-broadcast-variables-when-using-only-spark-sql-api]
 and running some local tests, I came across a performance bottleneck when 
using the _where_-Condition _Column.isin_.


I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
_Column.isin_ Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
{code}
df.filter(row => allowedValues.contains(row.getInt(0)))
{code}


however, when running a few tests locally, I realized that using _Column.isin_ 
is actually about 10 times slower than a _rdd.filter_ or a broadcast-inner-join.

Shouldn't {code}df.where(col("colname").isin(allowedValues)){code} perform 
(SQL-API overhead aside) as good as {code}df.filter(row => 
allowedValues.contains(row.getInt(0))){code} ?

I used the following dummy code for my local tests:

{code:scala}
package example


[jira] [Updated] (SPARK-33383) Improve performance of Column.isin Expression

2020-11-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/SPARK-33383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Felix Wollschläger updated SPARK-33383:
---
Description: 
When I asked [a question on 
Stackoverflow|https://stackoverflow.com/questions/64683189/usage-of-broadcast-variables-when-using-only-spark-sql-api]
 and running some local tests, I came across a performance bottleneck when 
using the _where_-Condition _Column.isin_.


I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
_Column.isin_ Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
{code}
df.filter(row => allowedValues.contains(row.getInt(0)))
{code}


however, when running a few tests locally, I realized that using _Column.isin_ 
is actually about 10 times slower than a _rdd.filter_ or a broadcast-inner-join.

Shouldn't {code}df.where(col("colname").isin(allowedValues)){code} perform 
(SQL-API overhead aside) perform as good as {code}df.filter(row => 
allowedValues.contains(row.getInt(0))){code} ?

I used the following dummy code for my local tests:

{code:scala}
package example

import org.apache.spark.sql.functions.{broadcast, col, count}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.util.Random

object Test {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Name")
.master("local[*]")
.config("spark.driver.host", "localhost")
.config("spark.ui.enabled", "false")
.getOrCreate()

import spark.implicits._

val _10Million = 1000
val random = new Random(1048394789305L)

val values = Seq.fill(_10Million)(random.nextInt())
val df = Seq.fill(_10Million)(random.nextInt()).toDF("value")
val allowedValues = getRandomElements(values, random, 1)

println("Starting ...")
runWithInCollection(spark, df, allowedValues)
println(" In Collection")
runWithBroadcastDF(spark, df, allowedValues)
println(" Broadcast DF")
runWithBroadcastVariable(spark, df, allowedValues)
println(" Broadcast Variable")
}

def getRandomElements[A](seq: Seq[A], random: Random, size: Int): Set[A] = {
val builder = Set.newBuilder[A]

for (i <- 0 until size) {
builder += getRandomElement(seq, random)
}

builder.result()
}

def getRandomElement[A](seq: Seq[A], random: Random): A = {
seq(random.nextInt(seq.length))
}

// I expected this one to be almost equivalent to the one with a 
broadcast-variable, but it's actually about 10 times slower
def runWithInCollection(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
spark.time {

df.where(col("value").isInCollection(allowedValues)).runTestAggregation()
}
}

// A bit slower than the one with a broadcast variable
def runWithBroadcastDF(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
import spark.implicits._

val allowedValuesDF = allowedValues.toSeq.toDF("allowedValue")

spark.time {
df.join(broadcast(allowedValuesDF), col("value") === 
col("allowedValue")).runTestAggregation()
}
}

// This is actually the fastest one
def runWithBroadcastVariable(spark: SparkSession, df: DataFrame, 
allowedValues: Set[Int]): Unit = {
val allowedValuesBroadcast = spark.sparkContext.broadcast(allowedValues)

spark.time {
df.filter(row => 
allowedValuesBroadcast.value.contains(row.getInt(0))).runTestAggregation()
}
}

implicit class TestRunner(val df: DataFrame) {

def runTestAggregation(): Unit = {
df.agg(count("value")).show()
}
}
}
{code}

  was:
When I asked [a question on 
Stackoverflow|https://stackoverflow.com/questions/64683189/usage-of-broadcast-variables-when-using-only-spark-sql-api]
 and running some local tests, I came across a performance bottleneck when 
using the `where`-Condition `Column.isin`.


I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
`Column.isin` Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
```scala
df.filter(row => allowedValues.contains(row.getInt(0)))
```



{noformat}
fdfsf
{noformat}


however, when running a few tests locally, I realized that using `Column.isin` 
is actually about 10 times slower than a ```rdd.filter``` or a 
broadcast-inner-join.

Shouldn't ```df.where(col("colname").isin(allowedValues))``` perform (SQL-API 
overhead aside) perform as good as ```df.filter(row => 
allowedValues.contains(row.getInt(0)))``` ?

{code:scala}

[jira] [Updated] (SPARK-33383) Improve performance of Column.isin Expression

2020-11-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/SPARK-33383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Felix Wollschläger updated SPARK-33383:
---
Description: 
When I asked [a question on 
Stackoverflow|https://stackoverflow.com/questions/64683189/usage-of-broadcast-variables-when-using-only-spark-sql-api]
 and running some local tests, I came across a performance bottleneck when 
using the _where_-Condition _Column.isin_.


I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
_Column.isin_ Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
{code}
df.filter(row => allowedValues.contains(row.getInt(0)))
{code}


however, when running a few tests locally, I realized that using _Column.isin_ 
is actually about 10 times slower than a _rdd.filter_ or a broadcast-inner-join.

Shouldn't {code}df.where(col("colname").isin(allowedValues)){code} perform 
(SQL-API overhead aside) as good as {code}df.filter(row => 
allowedValues.contains(row.getInt(0))){code} ?

I used the following dummy code for my local tests:

{code:scala}
package example

import org.apache.spark.sql.functions.{broadcast, col, count}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.util.Random

object Test {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Name")
.master("local[*]")
.config("spark.driver.host", "localhost")
.config("spark.ui.enabled", "false")
.getOrCreate()

import spark.implicits._

val _10Million = 1000
val random = new Random(1048394789305L)

val values = Seq.fill(_10Million)(random.nextInt())
val df = Seq.fill(_10Million)(random.nextInt()).toDF("value")
val allowedValues = getRandomElements(values, random, 1)

println("Starting ...")
runWithInCollection(spark, df, allowedValues)
println(" In Collection")
runWithBroadcastDF(spark, df, allowedValues)
println(" Broadcast DF")
runWithBroadcastVariable(spark, df, allowedValues)
println(" Broadcast Variable")
}

def getRandomElements[A](seq: Seq[A], random: Random, size: Int): Set[A] = {
val builder = Set.newBuilder[A]

for (i <- 0 until size) {
builder += getRandomElement(seq, random)
}

builder.result()
}

def getRandomElement[A](seq: Seq[A], random: Random): A = {
seq(random.nextInt(seq.length))
}

// I expected this one to be almost equivalent to the one with a 
broadcast-variable, but it's actually about 10 times slower
def runWithInCollection(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
spark.time {

df.where(col("value").isInCollection(allowedValues)).runTestAggregation()
}
}

// A bit slower than the one with a broadcast variable
def runWithBroadcastDF(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
import spark.implicits._

val allowedValuesDF = allowedValues.toSeq.toDF("allowedValue")

spark.time {
df.join(broadcast(allowedValuesDF), col("value") === 
col("allowedValue")).runTestAggregation()
}
}

// This is actually the fastest one
def runWithBroadcastVariable(spark: SparkSession, df: DataFrame, 
allowedValues: Set[Int]): Unit = {
val allowedValuesBroadcast = spark.sparkContext.broadcast(allowedValues)

spark.time {
df.filter(row => 
allowedValuesBroadcast.value.contains(row.getInt(0))).runTestAggregation()
}
}

implicit class TestRunner(val df: DataFrame) {

def runTestAggregation(): Unit = {
df.agg(count("value")).show()
}
}
}
{code}

  was:
When I asked [a question on 
Stackoverflow|https://stackoverflow.com/questions/64683189/usage-of-broadcast-variables-when-using-only-spark-sql-api]
 and running some local tests, I came across a performance bottleneck when 
using the _where_-Condition _Column.isin_.


I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
_Column.isin_ Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
{code}
df.filter(row => allowedValues.contains(row.getInt(0)))
{code}


however, when running a few tests locally, I realized that using _Column.isin_ 
is actually about 10 times slower than a _rdd.filter_ or a broadcast-inner-join.

Shouldn't {code}df.where(col("colname").isin(allowedValues)){code} perform 
(SQL-API overhead aside) perform as good as {code}df.filter(row => 
allowedValues.contains(row.getInt(0))){code} ?

I used the following dummy code for my local 

[jira] [Updated] (SPARK-33383) Improve performance of Column.isin Expression

2020-11-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/SPARK-33383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Felix Wollschläger updated SPARK-33383:
---
Description: 
When I asked [a question on 
Stackoverflow|https://stackoverflow.com/questions/64683189/usage-of-broadcast-variables-when-using-only-spark-sql-api]
 and running some local tests, I came across a performance bottleneck when 
using the `where`-Condition `Column.isin`.


I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
`Column.isin` Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
```scala
df.filter(row => allowedValues.contains(row.getInt(0)))
```



{noformat}
fdfsf
{noformat}


however, when running a few tests locally, I realized that using `Column.isin` 
is actually about 10 times slower than a ```rdd.filter``` or a 
broadcast-inner-join.

Shouldn't ```df.where(col("colname").isin(allowedValues))``` perform (SQL-API 
overhead aside) perform as good as ```df.filter(row => 
allowedValues.contains(row.getInt(0)))``` ?

{code:scala}
package example

import org.apache.spark.sql.functions.{broadcast, col, count}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.util.Random

object Test {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Name")
.master("local[*]")
.config("spark.driver.host", "localhost")
.config("spark.ui.enabled", "false")
.getOrCreate()

import spark.implicits._

val _10Million = 1000
val random = new Random(1048394789305L)

val values = Seq.fill(_10Million)(random.nextInt())
val df = Seq.fill(_10Million)(random.nextInt()).toDF("value")
val allowedValues = getRandomElements(values, random, 1)

println("Starting ...")
runWithInCollection(spark, df, allowedValues)
println(" In Collection")
runWithBroadcastDF(spark, df, allowedValues)
println(" Broadcast DF")
runWithBroadcastVariable(spark, df, allowedValues)
println(" Broadcast Variable")
}

def getRandomElements[A](seq: Seq[A], random: Random, size: Int): Set[A] = {
val builder = Set.newBuilder[A]

for (i <- 0 until size) {
builder += getRandomElement(seq, random)
}

builder.result()
}

def getRandomElement[A](seq: Seq[A], random: Random): A = {
seq(random.nextInt(seq.length))
}

// I expected this one to be almost equivalent to the one with a 
broadcast-variable, but it's actually about 10 times slower
def runWithInCollection(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
spark.time {

df.where(col("value").isInCollection(allowedValues)).runTestAggregation()
}
}

// A bit slower than the one with a broadcast variable
def runWithBroadcastDF(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
import spark.implicits._

val allowedValuesDF = allowedValues.toSeq.toDF("allowedValue")

spark.time {
df.join(broadcast(allowedValuesDF), col("value") === 
col("allowedValue")).runTestAggregation()
}
}

// This is actually the fastest one
def runWithBroadcastVariable(spark: SparkSession, df: DataFrame, 
allowedValues: Set[Int]): Unit = {
val allowedValuesBroadcast = spark.sparkContext.broadcast(allowedValues)

spark.time {
df.filter(row => 
allowedValuesBroadcast.value.contains(row.getInt(0))).runTestAggregation()
}
}

implicit class TestRunner(val df: DataFrame) {

def runTestAggregation(): Unit = {
df.agg(count("value")).show()
}
}
}
{code}

  was:
When I asked [a question on 
Stackoverflow|https://stackoverflow.com/questions/64683189/usage-of-broadcast-variables-when-using-only-spark-sql-api]
 and running some local tests, I came across a performance bottleneck when 
using the `where`-Condition `Column.isin`.

I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
`Column.isin` Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
```scala
df.filter(row => allowedValues.contains(row.getInt(0)))
```

however, when running a few tests locally, I realized that using `Column.isin` 
is actually about 10 times slower than a ```rdd.filter``` or a 
broadcast-inner-join.

Shouldn't ```df.where(col("colname").isin(allowedValues))``` perform (SQL-API 
overhead aside) perform as good as ```df.filter(row => 
allowedValues.contains(row.getInt(0)))``` ?

```scala
package example

import org.apache.spark.sql.functions.{broadcast, col,