[jira] [Updated] (SPARK-33383) Improve performance of Column.isin Expression

2020-11-08 Thread Jira


Felix Wollschläger updated SPARK-33383:
When I asked [a question on 
 and running some local tests, I came across a performance bottleneck when 
using the _where_-Condition _Column.isin_.

I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
_Column.isin_ Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
df.filter(row => allowedValues.contains(row.getInt(0)))

however, when running a few tests locally, I realized that using _Column.isin_ 
is actually about 10 times slower than a _rdd.filter_ or a broadcast-inner-join.

Shouldn't {code}df.where(col("colname").isin(allowedValues)){code} perform 
(SQL-API overhead aside) as good as {code}df.filter(row => 
allowedValues.contains(row.getInt(0))){code} ?

I used the following dummy code for my local tests:

package example

import org.apache.spark.sql.functions.{broadcast, col, count}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.util.Random

object Test {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.config("spark.driver.host", "localhost")
.config("spark.ui.enabled", "false")

import spark.implicits._

val _10Million = 1000
val random = new Random(1048394789305L)

val values = Seq.fill(_10Million)(random.nextInt())
val df = values.toDF("value")
val allowedValues = getRandomElements(values, random, 1)

println("Starting ...")
runWithInCollection(spark, df, allowedValues)
println(" In Collection")
runWithBroadcastDF(spark, df, allowedValues)
println(" Broadcast DF")
runWithBroadcastVariable(spark, df, allowedValues)
println(" Broadcast Variable")

def getRandomElements[A](seq: Seq[A], random: Random, size: Int): Set[A] = {
val builder = Set.newBuilder[A]

for (i <- 0 until size) {
builder += getRandomElement(seq, random)


def getRandomElement[A](seq: Seq[A], random: Random): A = {

// I expected this one to be almost equivalent to the one with a 
broadcast-variable, but it's actually about 10 times slower
def runWithInCollection(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
spark.time {


// A bit slower than the one with a broadcast variable
def runWithBroadcastDF(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
import spark.implicits._

val allowedValuesDF = allowedValues.toSeq.toDF("allowedValue")

spark.time {
df.join(broadcast(allowedValuesDF), col("value") === 

// This is actually the fastest one
def runWithBroadcastVariable(spark: SparkSession, df: DataFrame, 
allowedValues: Set[Int]): Unit = {
val allowedValuesBroadcast = spark.sparkContext.broadcast(allowedValues)

spark.time {
df.filter(row => 

implicit class TestRunner(val df: DataFrame) {

def runTestAggregation(): Unit = {

When I asked [a question on 
 and running some local tests, I came across a performance bottleneck when 
using the _where_-Condition _Column.isin_.

I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
_Column.isin_ Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
df.filter(row => allowedValues.contains(row.getInt(0)))

however, when running a few tests locally, I realized that using _Column.isin_ 
is actually about 10 times slower than a _rdd.filter_ or a broadcast-inner-join.

Shouldn't {code}df.where(col("colname").isin(allowedValues)){code} perform 
(SQL-API overhead aside) as good as {code}df.filter(row => 
allowedValues.contains(row.getInt(0))){code} ?

I used the following dummy code for my local tests:

package example

[jira] [Updated] (SPARK-33383) Improve performance of Column.isin Expression

2020-11-08 Thread Jira


Felix Wollschläger updated SPARK-33383:
When I asked [a question on 
 and running some local tests, I came across a performance bottleneck when 
using the _where_-Condition _Column.isin_.

I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
_Column.isin_ Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
df.filter(row => allowedValues.contains(row.getInt(0)))

however, when running a few tests locally, I realized that using _Column.isin_ 
is actually about 10 times slower than a _rdd.filter_ or a broadcast-inner-join.

Shouldn't {code}df.where(col("colname").isin(allowedValues)){code} perform 
(SQL-API overhead aside) perform as good as {code}df.filter(row => 
allowedValues.contains(row.getInt(0))){code} ?

I used the following dummy code for my local tests:

package example

import org.apache.spark.sql.functions.{broadcast, col, count}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.util.Random

object Test {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.config("spark.driver.host", "localhost")
.config("spark.ui.enabled", "false")

import spark.implicits._

val _10Million = 1000
val random = new Random(1048394789305L)

val values = Seq.fill(_10Million)(random.nextInt())
val df = Seq.fill(_10Million)(random.nextInt()).toDF("value")
val allowedValues = getRandomElements(values, random, 1)

println("Starting ...")
runWithInCollection(spark, df, allowedValues)
println(" In Collection")
runWithBroadcastDF(spark, df, allowedValues)
println(" Broadcast DF")
runWithBroadcastVariable(spark, df, allowedValues)
println(" Broadcast Variable")

def getRandomElements[A](seq: Seq[A], random: Random, size: Int): Set[A] = {
val builder = Set.newBuilder[A]

for (i <- 0 until size) {
builder += getRandomElement(seq, random)


def getRandomElement[A](seq: Seq[A], random: Random): A = {

// I expected this one to be almost equivalent to the one with a 
broadcast-variable, but it's actually about 10 times slower
def runWithInCollection(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
spark.time {


// A bit slower than the one with a broadcast variable
def runWithBroadcastDF(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
import spark.implicits._

val allowedValuesDF = allowedValues.toSeq.toDF("allowedValue")

spark.time {
df.join(broadcast(allowedValuesDF), col("value") === 

// This is actually the fastest one
def runWithBroadcastVariable(spark: SparkSession, df: DataFrame, 
allowedValues: Set[Int]): Unit = {
val allowedValuesBroadcast = spark.sparkContext.broadcast(allowedValues)

spark.time {
df.filter(row => 

implicit class TestRunner(val df: DataFrame) {

def runTestAggregation(): Unit = {

When I asked [a question on 
 and running some local tests, I came across a performance bottleneck when 
using the `where`-Condition `Column.isin`.

I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
`Column.isin` Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
df.filter(row => allowedValues.contains(row.getInt(0)))


however, when running a few tests locally, I realized that using `Column.isin` 
is actually about 10 times slower than a ```rdd.filter``` or a 

Shouldn't ```df.where(col("colname").isin(allowedValues))``` perform (SQL-API 
overhead aside) perform as good as ```df.filter(row => 
allowedValues.contains(row.getInt(0)))``` ?


[jira] [Updated] (SPARK-33383) Improve performance of Column.isin Expression

2020-11-08 Thread Jira


Felix Wollschläger updated SPARK-33383:
When I asked [a question on 
 and running some local tests, I came across a performance bottleneck when 
using the _where_-Condition _Column.isin_.

I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
_Column.isin_ Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
df.filter(row => allowedValues.contains(row.getInt(0)))

however, when running a few tests locally, I realized that using _Column.isin_ 
is actually about 10 times slower than a _rdd.filter_ or a broadcast-inner-join.

Shouldn't {code}df.where(col("colname").isin(allowedValues)){code} perform 
(SQL-API overhead aside) as good as {code}df.filter(row => 
allowedValues.contains(row.getInt(0))){code} ?

I used the following dummy code for my local tests:

package example

import org.apache.spark.sql.functions.{broadcast, col, count}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.util.Random

object Test {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.config("spark.driver.host", "localhost")
.config("spark.ui.enabled", "false")

import spark.implicits._

val _10Million = 1000
val random = new Random(1048394789305L)

val values = Seq.fill(_10Million)(random.nextInt())
val df = Seq.fill(_10Million)(random.nextInt()).toDF("value")
val allowedValues = getRandomElements(values, random, 1)

println("Starting ...")
runWithInCollection(spark, df, allowedValues)
println(" In Collection")
runWithBroadcastDF(spark, df, allowedValues)
println(" Broadcast DF")
runWithBroadcastVariable(spark, df, allowedValues)
println(" Broadcast Variable")

def getRandomElements[A](seq: Seq[A], random: Random, size: Int): Set[A] = {
val builder = Set.newBuilder[A]

for (i <- 0 until size) {
builder += getRandomElement(seq, random)


def getRandomElement[A](seq: Seq[A], random: Random): A = {

// I expected this one to be almost equivalent to the one with a 
broadcast-variable, but it's actually about 10 times slower
def runWithInCollection(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
spark.time {


// A bit slower than the one with a broadcast variable
def runWithBroadcastDF(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
import spark.implicits._

val allowedValuesDF = allowedValues.toSeq.toDF("allowedValue")

spark.time {
df.join(broadcast(allowedValuesDF), col("value") === 

// This is actually the fastest one
def runWithBroadcastVariable(spark: SparkSession, df: DataFrame, 
allowedValues: Set[Int]): Unit = {
val allowedValuesBroadcast = spark.sparkContext.broadcast(allowedValues)

spark.time {
df.filter(row => 

implicit class TestRunner(val df: DataFrame) {

def runTestAggregation(): Unit = {

When I asked [a question on 
 and running some local tests, I came across a performance bottleneck when 
using the _where_-Condition _Column.isin_.

I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
_Column.isin_ Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
df.filter(row => allowedValues.contains(row.getInt(0)))

however, when running a few tests locally, I realized that using _Column.isin_ 
is actually about 10 times slower than a _rdd.filter_ or a broadcast-inner-join.

Shouldn't {code}df.where(col("colname").isin(allowedValues)){code} perform 
(SQL-API overhead aside) perform as good as {code}df.filter(row => 
allowedValues.contains(row.getInt(0))){code} ?

I used the following dummy code for my local 

[jira] [Updated] (SPARK-33383) Improve performance of Column.isin Expression

2020-11-08 Thread Jira


Felix Wollschläger updated SPARK-33383:
When I asked [a question on 
 and running some local tests, I came across a performance bottleneck when 
using the `where`-Condition `Column.isin`.

I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
`Column.isin` Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
df.filter(row => allowedValues.contains(row.getInt(0)))


however, when running a few tests locally, I realized that using `Column.isin` 
is actually about 10 times slower than a ```rdd.filter``` or a 

Shouldn't ```df.where(col("colname").isin(allowedValues))``` perform (SQL-API 
overhead aside) perform as good as ```df.filter(row => 
allowedValues.contains(row.getInt(0)))``` ?

package example

import org.apache.spark.sql.functions.{broadcast, col, count}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.util.Random

object Test {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.config("spark.driver.host", "localhost")
.config("spark.ui.enabled", "false")

import spark.implicits._

val _10Million = 1000
val random = new Random(1048394789305L)

val values = Seq.fill(_10Million)(random.nextInt())
val df = Seq.fill(_10Million)(random.nextInt()).toDF("value")
val allowedValues = getRandomElements(values, random, 1)

println("Starting ...")
runWithInCollection(spark, df, allowedValues)
println(" In Collection")
runWithBroadcastDF(spark, df, allowedValues)
println(" Broadcast DF")
runWithBroadcastVariable(spark, df, allowedValues)
println(" Broadcast Variable")

def getRandomElements[A](seq: Seq[A], random: Random, size: Int): Set[A] = {
val builder = Set.newBuilder[A]

for (i <- 0 until size) {
builder += getRandomElement(seq, random)


def getRandomElement[A](seq: Seq[A], random: Random): A = {

// I expected this one to be almost equivalent to the one with a 
broadcast-variable, but it's actually about 10 times slower
def runWithInCollection(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
spark.time {


// A bit slower than the one with a broadcast variable
def runWithBroadcastDF(spark: SparkSession, df: DataFrame, allowedValues: 
Set[Int]): Unit = {
import spark.implicits._

val allowedValuesDF = allowedValues.toSeq.toDF("allowedValue")

spark.time {
df.join(broadcast(allowedValuesDF), col("value") === 

// This is actually the fastest one
def runWithBroadcastVariable(spark: SparkSession, df: DataFrame, 
allowedValues: Set[Int]): Unit = {
val allowedValuesBroadcast = spark.sparkContext.broadcast(allowedValues)

spark.time {
df.filter(row => 

implicit class TestRunner(val df: DataFrame) {

def runTestAggregation(): Unit = {

When I asked [a question on 
 and running some local tests, I came across a performance bottleneck when 
using the `where`-Condition `Column.isin`.

I have a set of allowed-values ("whitelist") with a size that's handleable 
in-memory really good (about 10k values). I thought simply using the 
`Column.isin` Expression in the SQL API should be the way to go. I assumed it 
would be runtime equivalent to
df.filter(row => allowedValues.contains(row.getInt(0)))

however, when running a few tests locally, I realized that using `Column.isin` 
is actually about 10 times slower than a ```rdd.filter``` or a 

Shouldn't ```df.where(col("colname").isin(allowedValues))``` perform (SQL-API 
overhead aside) perform as good as ```df.filter(row => 
allowedValues.contains(row.getInt(0)))``` ?

package example

import org.apache.spark.sql.functions.{broadcast, col,