[jira] [Updated] (SPARK-39885) Behavior differs between arrays_overlap and array_contains for negative 0.0

2022-07-27 Thread David Vogelbacher (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-39885:
--
Summary: Behavior differs between arrays_overlap and array_contains for 
negative 0.0  (was: Behavior differs between array_overlap and array_contains 
for negative 0.0)

> Behavior differs between arrays_overlap and array_contains for negative 0.0
> ---
>
> Key: SPARK-39885
> URL: https://issues.apache.org/jira/browse/SPARK-39885
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.2
>Reporter: David Vogelbacher
>Priority: Major
>
> {{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], 
> [-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 
> as the same (see 
> https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28)
> However, the {{Double::equals}} method doesn't. Therefore, we should either 
> mark double as false in 
> [TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala#L96],
>  or we should wrap it with our own equals method that handles this case.
> Java code snippets showing the issue:
> {code:java}
> dataset = sparkSession.createDataFrame(
> List.of(RowFactory.create(List.of(-0.0))),
> 
> DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField(
> "doubleCol", 
> DataTypes.createArrayType(DataTypes.DoubleType), false;
> Dataset df = dataset.withColumn(
> "overlaps", 
> functions.arrays_overlap(functions.array(functions.lit(+0.0)), 
> dataset.col("doubleCol")));
> List result = df.collectAsList(); // [[WrappedArray(-0.0),false]]
> {code}
> {code:java}
> dataset = sparkSession.createDataFrame(
> List.of(RowFactory.create(-0.0)),
> DataTypes.createStructType(
> 
> ImmutableList.of(DataTypes.createStructField("doubleCol", 
> DataTypes.DoubleType, false;
> Dataset df = dataset.withColumn(
> "contains", 
> functions.array_contains(functions.array(functions.lit(+0.0)), 
> dataset.col("doubleCol")));
> List result = df.collectAsList(); // [[-0.0,true]]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-39885) Behavior differs between array_overlap and array_contains for negative 0.0

2022-07-27 Thread David Vogelbacher (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-39885:
--
Description: 
{{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], 
[-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 as 
the same (see 
https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28)
However, the {{Double::equals}} method doesn't. Therefore, we should either 
mark double as false in 
[TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala#L96],
 or we should wrap it with our own equals method that handles this case.

Java code snippets showing the issue:

{code:java}
dataset = sparkSession.createDataFrame(
List.of(RowFactory.create(List.of(-0.0))),

DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField(
"doubleCol", 
DataTypes.createArrayType(DataTypes.DoubleType), false;
Dataset df = dataset.withColumn(
"overlaps", 
functions.arrays_overlap(functions.array(functions.lit(+0.0)), 
dataset.col("doubleCol")));
List result = df.collectAsList(); // [[WrappedArray(-0.0),false]]
{code}

{code:java}
dataset = sparkSession.createDataFrame(
List.of(RowFactory.create(-0.0)),
DataTypes.createStructType(

ImmutableList.of(DataTypes.createStructField("doubleCol", DataTypes.DoubleType, 
false;
Dataset df = dataset.withColumn(
"contains", 
functions.array_contains(functions.array(functions.lit(+0.0)), 
dataset.col("doubleCol")));
List result = df.collectAsList(); // [[-0.0,true]]
{code}


  was:
{{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], 
[-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 as 
the same (see 
https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28)
However, the {{Double::equals}} method doesn't. Therefore, we should either 
mark double as false in 
[TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala#L96],
 or we should wrap it with our own equals method that handles this case.

Java code snippets showing the issue:

{code:java}
dataset = sparkSession.createDataFrame(
List.of(RowFactory.create(List.of(-0.0))),

DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField(
"doubleCol", 
DataTypes.createArrayType(DataTypes.DoubleType), false;
Dataset df = dataset.withColumn(
"overlaps", 
functions.arrays_overlap(functions.array(functions.lit(+0.0)), 
dataset.col("doubleCol")));
List result = df.collectAsList(); // [[WrappedArray(-0.0),false]]
{code}

{code:java}
dataset = sparkSession.createDataFrame(
List.of(RowFactory.create(-0.0)),
DataTypes.createStructType(

ImmutableList.of(DataTypes.createStructField("doubleCol", DataTypes.DoubleType, 
false;
Dataset df = dataset.withColumn(
"overlaps", 
functions.array_contains(functions.array(functions.lit(+0.0)), 
dataset.col("doubleCol")));
List result = df.collectAsList(); // [[-0.0,true]]
{code}



> Behavior differs between array_overlap and array_contains for negative 0.0
> --
>
> Key: SPARK-39885
> URL: https://issues.apache.org/jira/browse/SPARK-39885
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.2
>Reporter: David Vogelbacher
>Priority: Major
>
> {{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], 
> [-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 
> as the same (see 
> https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28)
> However, the {{Double::equals}} method doesn't. Therefore, we should either 
> mark double as false in 
> [TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala#L96],
>  or we should wrap it with our own equals method that handles this case.
> Java code snippets showing the issue:
> 

[jira] [Updated] (SPARK-39885) Behavior differs between array_overlap and array_contains for negative 0.0

2022-07-26 Thread David Vogelbacher (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-39885:
--
Description: 
{{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], 
[-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 as 
the same (see 
https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28)
However, the {{Double::equals}} method doesn't. Therefore, we should either 
mark double as false in 
[TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala#L96],
 or we should wrap it with our own equals method that handles this case.

Java code snippets showing the issue:

{code:java}
dataset = sparkSession.createDataFrame(
List.of(RowFactory.create(List.of(-0.0))),

DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField(
"doubleCol", 
DataTypes.createArrayType(DataTypes.DoubleType), false;
Dataset df = dataset.withColumn(
"overlaps", 
functions.arrays_overlap(functions.array(functions.lit(+0.0)), 
dataset.col("doubleCol")));
List result = df.collectAsList(); // [[WrappedArray(-0.0),false]]
{code}

{code:java}
dataset = sparkSession.createDataFrame(
List.of(RowFactory.create(-0.0)),
DataTypes.createStructType(

ImmutableList.of(DataTypes.createStructField("doubleCol", DataTypes.DoubleType, 
false;
Dataset df = dataset.withColumn(
"overlaps", 
functions.array_contains(functions.array(functions.lit(+0.0)), 
dataset.col("doubleCol")));
List result = df.collectAsList(); // [[-0.0,true]]
{code}


  was:
{{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], 
[-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 as 
the same (see 
https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28)
However, the {{Double::equals}} method doesn't. Therefore, we should either 
mark double as false in 
[TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala],
 or we should wrap it with our own equals method that handles this case.

Java code snippets showing the issue:

{code:java}
dataset = sparkSession.createDataFrame(
List.of(RowFactory.create(List.of(-0.0))),

DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField(
"doubleCol", 
DataTypes.createArrayType(DataTypes.DoubleType), false;
Dataset df = dataset.withColumn(
"overlaps", 
functions.arrays_overlap(functions.array(functions.lit(+0.0)), 
dataset.col("doubleCol")));
List result = df.collectAsList(); // [[WrappedArray(-0.0),false]]
{code}

{code:java}
dataset = sparkSession.createDataFrame(
List.of(RowFactory.create(-0.0)),
DataTypes.createStructType(

ImmutableList.of(DataTypes.createStructField("doubleCol", DataTypes.DoubleType, 
false;
Dataset df = dataset.withColumn(
"overlaps", 
functions.array_contains(functions.array(functions.lit(+0.0)), 
dataset.col("doubleCol")));
List result = df.collectAsList(); // [[-0.0,true]]
{code}



> Behavior differs between array_overlap and array_contains for negative 0.0
> --
>
> Key: SPARK-39885
> URL: https://issues.apache.org/jira/browse/SPARK-39885
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.2
>Reporter: David Vogelbacher
>Priority: Major
>
> {{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], 
> [-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 
> as the same (see 
> https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28)
> However, the {{Double::equals}} method doesn't. Therefore, we should either 
> mark double as false in 
> [TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala#L96],
>  or we should wrap it with our own equals method that handles this case.
> Java code snippets showing the issue:
> {code:java}
> dataset = 

[jira] [Updated] (SPARK-39885) Behavior differs between array_overlap and array_contains for negative 0.0

2022-07-26 Thread David Vogelbacher (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-39885:
--
Description: 
{{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], 
[-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 as 
the same (see 
https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28)
However, the {{Double::equals}} method doesn't. Therefore, we should either 
mark double as false in 
[TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala],
 or we should wrap it with our own equals method that handles this case.

Java code snippets showing the issue:

{code:java}
dataset = sparkSession.createDataFrame(
List.of(RowFactory.create(List.of(-0.0))),

DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField(
"doubleCol", 
DataTypes.createArrayType(DataTypes.DoubleType), false;
Dataset df = dataset.withColumn(
"overlaps", 
functions.arrays_overlap(functions.array(functions.lit(+0.0)), 
dataset.col("doubleCol")));
List result = df.collectAsList(); // [[WrappedArray(-0.0),false]]
{code}

{code:java}
dataset = sparkSession.createDataFrame(
List.of(RowFactory.create(-0.0)),
DataTypes.createStructType(

ImmutableList.of(DataTypes.createStructField("doubleCol", DataTypes.DoubleType, 
false;
Dataset df = dataset.withColumn(
"overlaps", 
functions.array_contains(functions.array(functions.lit(+0.0)), 
dataset.col("doubleCol")));
List result = df.collectAsList(); // [[-0.0,true]]
{code}


  was:
{{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], 
[-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 as 
the same (see 
https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28)
However, the {{Double::equals}} method doesn't. Therefore, we should either 
mark double as false in {{TypeUtils#typeWithProperEquals}}, or we should wrap 
it with our own equals method that handles this case.

Java code snippets showing the issue:

{code:java}
dataset = sparkSession.createDataFrame(
List.of(RowFactory.create(List.of(-0.0))),

DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField(
"doubleCol", 
DataTypes.createArrayType(DataTypes.DoubleType), false;
Dataset df = dataset.withColumn(
"overlaps", 
functions.arrays_overlap(functions.array(functions.lit(+0.0)), 
dataset.col("doubleCol")));
List result = df.collectAsList(); // [[WrappedArray(-0.0),false]]
{code}

{code:java}
dataset = sparkSession.createDataFrame(
List.of(RowFactory.create(-0.0)),
DataTypes.createStructType(

ImmutableList.of(DataTypes.createStructField("doubleCol", DataTypes.DoubleType, 
false;
Dataset df = dataset.withColumn(
"overlaps", 
functions.array_contains(functions.array(functions.lit(+0.0)), 
dataset.col("doubleCol")));
List result = df.collectAsList(); // [[-0.0,true]]
{code}



> Behavior differs between array_overlap and array_contains for negative 0.0
> --
>
> Key: SPARK-39885
> URL: https://issues.apache.org/jira/browse/SPARK-39885
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.2
>Reporter: David Vogelbacher
>Priority: Major
>
> {{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], 
> [-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 
> as the same (see 
> https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28)
> However, the {{Double::equals}} method doesn't. Therefore, we should either 
> mark double as false in 
> [TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala],
>  or we should wrap it with our own equals method that handles this case.
> Java code snippets showing the issue:
> {code:java}
> dataset = sparkSession.createDataFrame(
> List.of(RowFactory.create(List.of(-0.0))),
> 
> DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField(
> "doubleCol", 
> 

[jira] [Created] (SPARK-39885) Behavior differs between array_overlap and array_contains for negative 0.0

2022-07-26 Thread David Vogelbacher (Jira)
David Vogelbacher created SPARK-39885:
-

 Summary: Behavior differs between array_overlap and array_contains 
for negative 0.0
 Key: SPARK-39885
 URL: https://issues.apache.org/jira/browse/SPARK-39885
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.2
Reporter: David Vogelbacher


{{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], 
[-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 as 
the same (see 
https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28)
However, the {{Double::equals}} method doesn't. Therefore, we should either 
mark double as false in {{TypeUtils#typeWithProperEquals}}, or we should wrap 
it with our own equals method that handles this case.

Java code snippets showing the issue:

{code:java}
dataset = sparkSession.createDataFrame(
List.of(RowFactory.create(List.of(-0.0))),

DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField(
"doubleCol", 
DataTypes.createArrayType(DataTypes.DoubleType), false;
Dataset df = dataset.withColumn(
"overlaps", 
functions.arrays_overlap(functions.array(functions.lit(+0.0)), 
dataset.col("doubleCol")));
List result = df.collectAsList(); // [[WrappedArray(-0.0),false]]
{code}

{code:java}
dataset = sparkSession.createDataFrame(
List.of(RowFactory.create(-0.0)),
DataTypes.createStructType(

ImmutableList.of(DataTypes.createStructField("doubleCol", DataTypes.DoubleType, 
false;
Dataset df = dataset.withColumn(
"overlaps", 
functions.array_contains(functions.array(functions.lit(+0.0)), 
dataset.col("doubleCol")));
List result = df.collectAsList(); // [[-0.0,true]]
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-39746) Binary array operations can be faster if one side is a constant

2022-07-11 Thread David Vogelbacher (Jira)
David Vogelbacher created SPARK-39746:
-

 Summary: Binary array operations can be faster if one side is a 
constant
 Key: SPARK-39746
 URL: https://issues.apache.org/jira/browse/SPARK-39746
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: David Vogelbacher


Array operations such as 
[ArraysOverlap|https://github.com/apache/spark/blob/79f133b7bbc1d9aa6a20dd8a34ec120902f96155/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L1367]
 are optimized to put all the elements of the smaller array into a HashSet, if 
elements properly support equals. 
However, if one of the arrays is a constant, we could do much better as we 
don't have to reconstruct the HashSet for each row, we could construct it just 
once and send it to all the executors. This would improve runtime by a constant 
factor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28761) spark.driver.maxResultSize only applies to compressed data

2019-08-16 Thread David Vogelbacher (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-28761:
--
Description: 
Spark has a setting {{spark.driver.maxResultSize}}, see 
https://spark.apache.org/docs/latest/configuration.html#application-properties :
{noformat}
Limit of total size of serialized results of all partitions for each Spark 
action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. 
Jobs will be aborted if the total size is above this limit. Having a high limit 
may cause out-of-memory errors in driver (depends on spark.driver.memory and 
memory overhead of objects in JVM). 
Setting a proper limit can protect the driver from out-of-memory errors.
{noformat}
This setting can be very useful in constraining the memory that the spark 
driver needs for a specific spark action. However, this limit is checked before 
decompressing data in 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L662

Even if the compressed data is below the limit the uncompressed data can still 
be far above. In order to protect the driver we should also impose a limit on 
the uncompressed data. We could do this in 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L344
I propose adding a new config option {{spark.driver.maxUncompressedResultSize}}.

A simple repro of this with spark shell:
{noformat}
> printf 'a%.0s' {1..10} > test.csv # create a 100 MB file
> ./bin/spark-shell --conf "spark.driver.maxResultSize=1"
scala> val df = spark.read.format("csv").load("/Users/dvogelbacher/test.csv")
df: org.apache.spark.sql.DataFrame = [_c0: string]

scala> val results = df.collect()
results: Array[org.apache.spark.sql.Row] = 
Array([a...

scala> results(0).getString(0).size
res0: Int = 10
{noformat}

Even though we set maxResultSize to 10 MB, we collect a result that is 100MB 
uncompressed.

  was:
Spark has a setting {{spark.driver.maxResultSize}}, see 
https://spark.apache.org/docs/latest/configuration.html#application-properties :
{noformat}
Limit of total size of serialized results of all partitions for each Spark 
action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs 
will be aborted if the total size is above this limit. Having a high limit may 
cause out-of-memory errors in driver (depends on spark.driver.memory and memory 
overhead of objects in JVM). Setting a proper limit can protect the driver from 
out-of-memory errors.
{noformat}
This setting can be very useful in constraining the memory that the spark 
driver needs for a specific spark action. However, this limit is checked before 
decompressing data in 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L662

Even if the compressed data is below the limit the uncompressed data can still 
be far above. In order to protect the driver we should also impose a limit on 
the uncompressed data. We could do this in 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L344
I propose adding a new config option {{spark.driver.maxUncompressedResultSize}}.

A simple repro of this with spark shell:
{noformat}
> printf 'a%.0s' {1..10} > test.csv # create a 100 MB file
> ./bin/spark-shell --conf "spark.driver.maxResultSize=1"
scala> val df = spark.read.format("csv").load("/Users/dvogelbacher/test.csv")
df: org.apache.spark.sql.DataFrame = [_c0: string]

scala> val results = df.collect()
results: Array[org.apache.spark.sql.Row] = 

[jira] [Updated] (SPARK-28761) spark.driver.maxResultSize only applies to compressed data

2019-08-16 Thread David Vogelbacher (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-28761:
--
Description: 
Spark has a setting {{spark.driver.maxResultSize}}, see 
https://spark.apache.org/docs/latest/configuration.html#application-properties :
{noformat}
Limit of total size of serialized results of all partitions for each Spark 
action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs 
will be aborted if the total size is above this limit. Having a high limit may 
cause out-of-memory errors in driver (depends on spark.driver.memory and memory 
overhead of objects in JVM). Setting a proper limit can protect the driver from 
out-of-memory errors.
{noformat}
This setting can be very useful in constraining the memory that the spark 
driver needs for a specific spark action. However, this limit is checked before 
decompressing data in 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L662

Even if the compressed data is below the limit the uncompressed data can still 
be far above. In order to protect the driver we should also impose a limit on 
the uncompressed data. We could do this in 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L344
I propose adding a new config option {{spark.driver.maxUncompressedResultSize}}.

A simple repro of this with spark shell:
{noformat}
> printf 'a%.0s' {1..10} > test.csv # create a 100 MB file
> ./bin/spark-shell --conf "spark.driver.maxResultSize=1"
scala> val df = spark.read.format("csv").load("/Users/dvogelbacher/test.csv")
df: org.apache.spark.sql.DataFrame = [_c0: string]

scala> val results = df.collect()
results: Array[org.apache.spark.sql.Row] = 
Array([a...

scala> results(0).getString(0).size
res0: Int = 10
{noformat}

Even though we set maxResultSize to 10 MB, we collect a result that is 100MB 
uncompressed.

  was:
Spark has a setting `spark.driver.maxResultSize`, see 
https://spark.apache.org/docs/latest/configuration.html#application-properties :
{noformat}
Limit of total size of serialized results of all partitions for each Spark 
action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs 
will be aborted if the total size is above this limit. Having a high limit may 
cause out-of-memory errors in driver (depends on spark.driver.memory and memory 
overhead of objects in JVM). Setting a proper limit can protect the driver from 
out-of-memory errors.
{noformat}
This setting can be very useful in constraining the memory that the spark 
driver needs for a specific spark action. However, this limit is checked before 
decompressing data in 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L662

Even if the compressed data is below the limit the uncompressed data can still 
be far above. In order to protect the driver we should also impose a limit on 
the uncompressed data. We could do this in 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L344
I propose adding a new config option {{spark.driver.maxUncompressedResultSize}}.

A simple repro of this with spark shell:
{noformat}
> printf 'a%.0s' {1..10} > test.csv # create a 100 MB file
> ./bin/spark-shell --conf "spark.driver.maxResultSize=1"
scala> val df = spark.read.format("csv").load("/Users/dvogelbacher/test.csv")
df: org.apache.spark.sql.DataFrame = [_c0: string]

scala> val results = df.collect()
results: Array[org.apache.spark.sql.Row] = 

[jira] [Created] (SPARK-28761) spark.driver.maxResultSize only applies to compressed data

2019-08-16 Thread David Vogelbacher (JIRA)
David Vogelbacher created SPARK-28761:
-

 Summary: spark.driver.maxResultSize only applies to compressed data
 Key: SPARK-28761
 URL: https://issues.apache.org/jira/browse/SPARK-28761
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: David Vogelbacher


Spark has a setting `spark.driver.maxResultSize`, see 
https://spark.apache.org/docs/latest/configuration.html#application-properties :
{noformat}
Limit of total size of serialized results of all partitions for each Spark 
action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs 
will be aborted if the total size is above this limit. Having a high limit may 
cause out-of-memory errors in driver (depends on spark.driver.memory and memory 
overhead of objects in JVM). Setting a proper limit can protect the driver from 
out-of-memory errors.
{noformat}
This setting can be very useful in constraining the memory that the spark 
driver needs for a specific spark action. However, this limit is checked before 
decompressing data in 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L662

Even if the compressed data is below the limit the uncompressed data can still 
be far above. In order to protect the driver we should also impose a limit on 
the uncompressed data. We could do this in 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L344
I propose adding a new config option {{spark.driver.maxUncompressedResultSize}}.

A simple repro of this with spark shell:
{noformat}
> printf 'a%.0s' {1..10} > test.csv # create a 100 MB file
> ./bin/spark-shell --conf "spark.driver.maxResultSize=1"
scala> val df = spark.read.format("csv").load("/Users/dvogelbacher/test.csv")
df: org.apache.spark.sql.DataFrame = [_c0: string]

scala> val results = df.collect()
results: Array[org.apache.spark.sql.Row] = 
Array([a...

scala> results(0).getString(0).size
res0: Int = 10
{noformat}

Even though we set maxResultSize to 10 MB, we collect a result that is 100MB 
uncompressed.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27805) toPandas does not propagate SparkExceptions with arrow enabled

2019-05-22 Thread David Vogelbacher (JIRA)
David Vogelbacher created SPARK-27805:
-

 Summary: toPandas does not propagate SparkExceptions with arrow 
enabled
 Key: SPARK-27805
 URL: https://issues.apache.org/jira/browse/SPARK-27805
 Project: Spark
  Issue Type: Improvement
  Components: PySpark, SQL
Affects Versions: 3.1.0
Reporter: David Vogelbacher


When calling {{toPandas}} with arrow enabled errors encountered during the 
collect are not propagated to the python process.
There is only a very general {{EofError}} raised.
Example of behavior with arrow enabled vs. arrow disabled:
{noformat}
import traceback
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def raise_exception():
  raise Exception("My error")
error_udf = udf(raise_exception, IntegerType())
df = spark.range(3).toDF("i").withColumn("x", error_udf())
try:
df.toPandas()
except:
no_arrow_exception = traceback.format_exc()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
try:
df.toPandas()
except:
arrow_exception = traceback.format_exc()
print no_arrow_exception
print arrow_exception
{noformat}
{{arrow_exception}} gives as output:
{noformat}
>>> print arrow_exception
Traceback (most recent call last):
  File "", line 2, in 
  File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 
2143, in toPandas
batches = self._collectAsArrow()
  File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 
2205, in _collectAsArrow
results = list(_load_from_socket(sock_info, ArrowCollectSerializer()))
  File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 210, 
in load_stream
num = read_int(stream)
  File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 810, 
in read_int
raise EOFError
EOFError
{noformat}

{{no_arrow_exception}} gives as output:
{noformat}
Traceback (most recent call last):
  File "", line 2, in 
  File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 
2166, in toPandas
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 
516, in collect
sock_info = self._jdf.collectToPython()
  File 
"/Users/dvogelbacher/git/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File "/Users/dvogelbacher/git/spark/python/pyspark/sql/utils.py", line 89, in 
deco
return f(*a, **kw)
  File 
"/Users/dvogelbacher/git/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py",
 line 328, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o38.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in 
stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 
7, localhost, executor driver): org.apache.spark.api.python.PythonException: 
Traceback (most recent call last):
  File 
"/Users/dvogelbacher/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
428, in main
process()
  File 
"/Users/dvogelbacher/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
423, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 438, 
in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
  File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 141, 
in dump_stream
for obj in iterator:
  File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 427, 
in _batched
for item in iterator:
  File "", line 1, in 
  File 
"/Users/dvogelbacher/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
86, in 
return lambda *a: f(*a)
  File "/Users/dvogelbacher/git/spark/python/pyspark/util.py", line 99, in 
wrapper
return f(*args, **kwargs)
  File "", line 2, in raise_exception
Exception: My error
...
{noformat}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27778) toPandas with arrow enabled fails for DF with no partitions

2019-05-20 Thread David Vogelbacher (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-27778:
--
Summary: toPandas with arrow enabled fails for DF with no partitions  (was: 
toPandas with arrow enabled fails for DF with no partition)

> toPandas with arrow enabled fails for DF with no partitions
> ---
>
> Key: SPARK-27778
> URL: https://issues.apache.org/jira/browse/SPARK-27778
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: David Vogelbacher
>Priority: Major
>
> Calling to pandas with {{spark.sql.execution.arrow.enabled: true}} fails for 
> dataframes with no partitions. The error is a {{EOFError}}. With 
> {{spark.sql.execution.arrow.enabled: false}} the conversion.
> Repro (on current master branch):
> {noformat}
> >>> from pyspark.sql.types import *
> >>> schema = StructType([StructField("field1", StringType(), True)])
> >>> df = spark.createDataFrame(sc.emptyRDD(), schema)
> >>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
> >>> df.toPandas()
> /Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py:2162: 
> UserWarning: toPandas attempted Arrow optimization because 
> 'spark.sql.execution.arrow.enabled' is set to true, but has reached the error 
> below and can not continue. Note that 
> 'spark.sql.execution.arrow.fallback.enabled' does not have an effect on 
> failures in the middle of computation.
>   warnings.warn(msg)
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 
> 2143, in toPandas
> batches = self._collectAsArrow()
>   File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 
> 2205, in _collectAsArrow
> results = list(_load_from_socket(sock_info, ArrowCollectSerializer()))
>   File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 
> 210, in load_stream
> num = read_int(stream)
>   File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 
> 810, in read_int
> raise EOFError
> EOFError
> >>> spark.conf.set("spark.sql.execution.arrow.enabled", "false")
> >>> df.toPandas()
> Empty DataFrame
> Columns: [field1]
> Index: []
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27778) toPandas with arrow enabled fails for DF with no partitions

2019-05-20 Thread David Vogelbacher (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843987#comment-16843987
 ] 

David Vogelbacher commented on SPARK-27778:
---

I will make a pr for this shortly.

> toPandas with arrow enabled fails for DF with no partitions
> ---
>
> Key: SPARK-27778
> URL: https://issues.apache.org/jira/browse/SPARK-27778
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: David Vogelbacher
>Priority: Major
>
> Calling to pandas with {{spark.sql.execution.arrow.enabled: true}} fails for 
> dataframes with no partitions. The error is a {{EOFError}}. With 
> {{spark.sql.execution.arrow.enabled: false}} the conversion.
> Repro (on current master branch):
> {noformat}
> >>> from pyspark.sql.types import *
> >>> schema = StructType([StructField("field1", StringType(), True)])
> >>> df = spark.createDataFrame(sc.emptyRDD(), schema)
> >>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
> >>> df.toPandas()
> /Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py:2162: 
> UserWarning: toPandas attempted Arrow optimization because 
> 'spark.sql.execution.arrow.enabled' is set to true, but has reached the error 
> below and can not continue. Note that 
> 'spark.sql.execution.arrow.fallback.enabled' does not have an effect on 
> failures in the middle of computation.
>   warnings.warn(msg)
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 
> 2143, in toPandas
> batches = self._collectAsArrow()
>   File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 
> 2205, in _collectAsArrow
> results = list(_load_from_socket(sock_info, ArrowCollectSerializer()))
>   File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 
> 210, in load_stream
> num = read_int(stream)
>   File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 
> 810, in read_int
> raise EOFError
> EOFError
> >>> spark.conf.set("spark.sql.execution.arrow.enabled", "false")
> >>> df.toPandas()
> Empty DataFrame
> Columns: [field1]
> Index: []
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27778) toPandas with arrow enabled fails for DF with no partition

2019-05-20 Thread David Vogelbacher (JIRA)
David Vogelbacher created SPARK-27778:
-

 Summary: toPandas with arrow enabled fails for DF with no partition
 Key: SPARK-27778
 URL: https://issues.apache.org/jira/browse/SPARK-27778
 Project: Spark
  Issue Type: Bug
  Components: PySpark, SQL
Affects Versions: 3.0.0
Reporter: David Vogelbacher


Calling to pandas with {{spark.sql.execution.arrow.enabled: true}} fails for 
dataframes with no partitions. The error is a {{EOFError}}. With 
{{spark.sql.execution.arrow.enabled: false}} the conversion.

Repro (on current master branch):
{noformat}
>>> from pyspark.sql.types import *
>>> schema = StructType([StructField("field1", StringType(), True)])
>>> df = spark.createDataFrame(sc.emptyRDD(), schema)
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> df.toPandas()
/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py:2162: 
UserWarning: toPandas attempted Arrow optimization because 
'spark.sql.execution.arrow.enabled' is set to true, but has reached the error 
below and can not continue. Note that 
'spark.sql.execution.arrow.fallback.enabled' does not have an effect on 
failures in the middle of computation.

  warnings.warn(msg)
Traceback (most recent call last):
  File "", line 1, in 
  File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 
2143, in toPandas
batches = self._collectAsArrow()
  File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 
2205, in _collectAsArrow
results = list(_load_from_socket(sock_info, ArrowCollectSerializer()))
  File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 210, 
in load_stream
num = read_int(stream)
  File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 810, 
in read_int
raise EOFError
EOFError
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "false")
>>> df.toPandas()
Empty DataFrame
Columns: [field1]
Index: []
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation

2019-01-23 Thread David Vogelbacher (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750674#comment-16750674
 ] 

David Vogelbacher commented on SPARK-24437:
---

[~DaveDeCaprio] I have not tested it yet but 
https://issues.apache.org/jira/browse/SPARK-25998 and its associated 
[PR|https://github.com/apache/spark/pull/22995] might help here.

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26423) [K8s] Make sure that disconnected executors eventually get deleted

2018-12-20 Thread David Vogelbacher (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-26423:
--
Description: 
If an executor disconnects we currently only disable it in the 
{{KubernetesClusterSchedulerBackend}} but don't take any further action - in 
the expectation all the other necessary actions (deleting it from spark, 
requesting a new replacement executor, ...) will be driven by k8s lifecycle 
events.
However, this only works if the reason that the executor disconnected is that 
the executor pod is dying/shutting down/...
It doesn't work if there is just some network issue between driver and executor 
(but the executor pod is still running in k8s and keeps running).
Thus (as indicated in the TODO comment in 
[KubernetesClusterSchedulerBackend|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala#L158]),
 we should make sure that a disconnected executor eventually does get killed in 
k8s.



  was:
If an executor disconnects we currently only disable it in the 
{{KubernetesClusterSchedulerBackend}} but don't take any further action - in 
the expectation all the other necessary actions (deleting it from spark, 
requesting a new replacement executor, ...) will be driven by k8s lifecycle 
events.
However, this only works if the reason that the executor disconnected is that 
the executor pod is dying/shutting down/...
It doesn't work if there is just some network issue between driver and executor 
(but the executor pod is still running in k8s and keeps running).
Thus (as indicated in the TODO comment in 
https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala#L158),
 we should make sure that a disconnected executor eventually does get killed in 
k8s.




> [K8s] Make sure that disconnected executors eventually get deleted
> --
>
> Key: SPARK-26423
> URL: https://issues.apache.org/jira/browse/SPARK-26423
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: David Vogelbacher
>Priority: Major
>
> If an executor disconnects we currently only disable it in the 
> {{KubernetesClusterSchedulerBackend}} but don't take any further action - in 
> the expectation all the other necessary actions (deleting it from spark, 
> requesting a new replacement executor, ...) will be driven by k8s lifecycle 
> events.
> However, this only works if the reason that the executor disconnected is that 
> the executor pod is dying/shutting down/...
> It doesn't work if there is just some network issue between driver and 
> executor (but the executor pod is still running in k8s and keeps running).
> Thus (as indicated in the TODO comment in 
> [KubernetesClusterSchedulerBackend|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala#L158]),
>  we should make sure that a disconnected executor eventually does get killed 
> in k8s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26423) [K8s] Make sure that disconnected executors eventually get deleted

2018-12-20 Thread David Vogelbacher (JIRA)
David Vogelbacher created SPARK-26423:
-

 Summary: [K8s] Make sure that disconnected executors eventually 
get deleted
 Key: SPARK-26423
 URL: https://issues.apache.org/jira/browse/SPARK-26423
 Project: Spark
  Issue Type: Bug
  Components: Kubernetes
Affects Versions: 2.4.0
Reporter: David Vogelbacher


If an executor disconnects we currently only disable it in the 
{{KubernetesClusterSchedulerBackend}} but don't take any further action - in 
the expectation all the other necessary actions (deleting it from spark, 
requesting a new replacement executor, ...) will be driven by k8s lifecycle 
events.
However, this only works if the reason that the executor disconnected is that 
the executor pod is dying/shutting down/...
It doesn't work if there is just some network issue between driver and executor 
(but the executor pod is still running in k8s and keeps running).
Thus (as indicated in the TODO comment in 
https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala#L158),
 we should make sure that a disconnected executor eventually does get killed in 
k8s.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-11-08 Thread David Vogelbacher (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16679768#comment-16679768
 ] 

David Vogelbacher commented on SPARK-24437:
---

Thanks for the explanations! I will look into the best workaround for this 
use-case then.

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-11-07 Thread David Vogelbacher (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16679253#comment-16679253
 ] 

David Vogelbacher commented on SPARK-24437:
---

[~eyalfa]  There might be hundreds of cached dataframes at the same time (they 
do get unpersisted after a while, but only when they are very unlikely to be 
used again). 
The thing here is that all the dataframes that are cached are generally quite 
small (~100.000 rows). However, they might be created by a series of joins. So 
at times the broadcasted data for a specific, cached dataframe is likely bigger 
than the cached dataframe itself.

This might be a bit of an unusual use case. I do know of the workarounds you 
proposed, but they would significantly harm perf (disabling broadcast joins is 
not something I want to do for example). 

In this specific example (where the cached dataframes are smaller than the 
broadcasted data), it would really be desirable to clean up the broadcasted 
data and not have it stick around on the driver until the dataframe gets 
uncached.
I still don't quite understand why garbage collecting the broadcasted item 
would lead to failures when executing the plan later (in case parts of the 
cached data got evicted), as executing the plan could always just recompute the 
broadcasted variable? [~mgaido]


> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-11-05 Thread David Vogelbacher (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675690#comment-16675690
 ] 

David Vogelbacher commented on SPARK-24437:
---

[~eyalfa] the cached relations only really take up space on the executors, as 
they hold the cached data, whereas the broadcast variable takes up space on the 
driver (which eventually OOMs/GCs a lot).

[~mgaido] Yes, I realize that the broadcast variable is used for re-computation 
if parts of the cached dataframe get evicted. But couldn't the broadcast 
variable also get recomputed in this case? Since we do keep track of the whole 
logical plan when caching a dataset, we should always be able to recompute the 
broadcast variable when needed.

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-11-01 Thread David Vogelbacher (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671686#comment-16671686
 ] 

David Vogelbacher edited comment on SPARK-24437 at 11/1/18 2:43 PM:


Hey [~mgaido], I am seeing something similar in one of our long running 
applications.
The longer it runs the higher heap usage in the driver grows. Taking a heap 
dump, most of its space is taken up by {{UnsafeHashedRelation}} instances. 
Looking at their paths to GC, they are strongly referenced by 
{{InMemoryRelation}} instances:
 !Screen Shot 2018-11-01 at 10.38.30 AM.png! 

So it looks like if dataframes are cached then the {{UnsafeHashedRelation}} 
instances are never cleaned up, because they are strongly references by 
generated codegen plan? 
The context cleaner can only clean them up when they are no longer strongly 
referenced.


was (Author: dvogelbacher):
Hey [~mgaido], I am seeing something similar in one of our long running 
applications.
The longer it runs the higher heap usage in the driver grows. Taking a heap 
dump, most of its space is taken up by {{UnsafeHashedRelation}} instances. 
Looking at their paths to GC, they are strongly referenced by 
{{InMemoryRelation}} instances:
 !Screen Shot 2018-11-01 at 10.38.30 AM.png! 

So it looks like if dataframes are cached the {{UnsafeHashedRelation}}s are 
never cleaned up, because they are strongly references by generated codegen 
plan? 
The context cleaner can only clean them up when they are no longer strongly 
referenced.

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-11-01 Thread David Vogelbacher (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671686#comment-16671686
 ] 

David Vogelbacher commented on SPARK-24437:
---

Hey [~mgaido], I am seeing something similar in one of our long running 
applications.
The longer it runs the higher heap usage in the driver grows. Taking a heap 
dump, most of its space is taken up by {{UnsafeHashedRelation}} instances. 
Looking at their paths to GC, they are strongly referenced by 
{{InMemoryRelation}} instances:
 !Screen Shot 2018-11-01 at 10.38.30 AM.png! 

So it looks like if dataframes are cached the {{UnsafeHashedRelation}}s are 
never cleaned up, because they are strongly references by generated codegen 
plan? 
The context cleaner can only clean them up when they are no longer strongly 
referenced.

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-11-01 Thread David Vogelbacher (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-24437:
--
Attachment: Screen Shot 2018-11-01 at 10.38.30 AM.png

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24983) Collapsing multiple project statements with dependent When-Otherwise statements on the same column can OOM the driver

2018-07-31 Thread David Vogelbacher (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-24983:
--
Description: 
I noticed that writing a spark job that includes many sequential 
{{when-otherwise}} statements on the same column can easily OOM the driver 
while generating the optimized plan because the project node will grow 
exponentially in size.

Example:
{noformat}
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df = Seq("a", "b", "c", "1").toDF("text")
df: org.apache.spark.sql.DataFrame = [text: string]

scala> var dfCaseWhen = df.filter($"text" =!= lit("0"))
dfCaseWhen: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [text: 
string]

scala> for( a <- 1 to 5) {
 | dfCaseWhen = dfCaseWhen.withColumn("text", when($"text" === 
lit(a.toString), lit("r" + a.toString)).otherwise($"text"))
 | }

scala> dfCaseWhen.queryExecution.analyzed
res6: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [CASE WHEN (text#12 = 5) THEN r5 ELSE text#12 END AS text#14]
+- Project [CASE WHEN (text#10 = 4) THEN r4 ELSE text#10 END AS text#12]
   +- Project [CASE WHEN (text#8 = 3) THEN r3 ELSE text#8 END AS text#10]
  +- Project [CASE WHEN (text#6 = 2) THEN r2 ELSE text#6 END AS text#8]
 +- Project [CASE WHEN (text#3 = 1) THEN r1 ELSE text#3 END AS text#6]
+- Filter NOT (text#3 = 0)
   +- Project [value#1 AS text#3]
  +- LocalRelation [value#1]

scala> dfCaseWhen.queryExecution.optimizedPlan
res5: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (value#1 = 1) 
THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE 
value#1 END END = 3) THEN r3 ELSE CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 
ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 
END END END = 4) THEN r4 ELSE CASE WHEN (CASE WHEN (CASE WHEN (value#1 = 1) 
THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE 
value#1 END END = 3) THEN r3 ELSE CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 
ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 
END END END END = 5) THEN r5 ELSE CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN 
(value#1 = 1) THEN r1 ELSE va...
{noformat}

As one can see the optimized plan grows exponentially in the number of 
{{when-otherwise}} statements here.

I can see that this comes from the {{CollapseProject}} optimizer rule.
Maybe we should put a limit on the resulting size of the project node after 
collapsing and only collapse if we stay under the limit.

  was:
Hi,
I noticed that writing a spark job that includes many sequential when-otherwise 
statements on the same column can easily OOM the driver while generating the 
optimized plan because the project node will grow exponentially in size.

Example:
{noformat}
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df = Seq("a", "b", "c", "1").toDF("text")
df: org.apache.spark.sql.DataFrame = [text: string]

scala> var dfCaseWhen = df.filter($"text" =!= lit("0"))
dfCaseWhen: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [text: 
string]

scala> for( a <- 1 to 5) {
 | dfCaseWhen = dfCaseWhen.withColumn("text", when($"text" === 
lit(a.toString), lit("r" + a.toString)).otherwise($"text"))
 | }

scala> dfCaseWhen.queryExecution.analyzed
res6: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [CASE WHEN (text#12 = 5) THEN r5 ELSE text#12 END AS text#14]
+- Project [CASE WHEN (text#10 = 4) THEN r4 ELSE text#10 END AS text#12]
   +- Project [CASE WHEN (text#8 = 3) THEN r3 ELSE text#8 END AS text#10]
  +- Project [CASE WHEN (text#6 = 2) THEN r2 ELSE text#6 END AS text#8]
 +- Project [CASE WHEN (text#3 = 1) THEN r1 ELSE text#3 END AS text#6]
+- Filter NOT (text#3 = 0)
   +- Project [value#1 AS text#3]
  +- LocalRelation [value#1]

scala> dfCaseWhen.queryExecution.optimizedPlan
res5: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (value#1 = 1) 
THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE 
value#1 END END = 3) THEN r3 ELSE CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 
ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 
END END END = 4) THEN r4 ELSE CASE WHEN (CASE WHEN (CASE WHEN (value#1 = 1) 
THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE 
value#1 END END = 3) THEN r3 ELSE CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 
ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 
END END END END = 5) THEN r5 ELSE CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN 
(value#1 = 1) 

[jira] [Created] (SPARK-24983) Collapsing multiple project statements with dependent When-Otherwise statements on the same column can OOM the driver

2018-07-31 Thread David Vogelbacher (JIRA)
David Vogelbacher created SPARK-24983:
-

 Summary: Collapsing multiple project statements with dependent 
When-Otherwise statements on the same column can OOM the driver
 Key: SPARK-24983
 URL: https://issues.apache.org/jira/browse/SPARK-24983
 Project: Spark
  Issue Type: Bug
  Components: Optimizer
Affects Versions: 2.3.1
Reporter: David Vogelbacher


Hi,
I noticed that writing a spark job that includes many sequential when-otherwise 
statements on the same column can easily OOM the driver while generating the 
optimized plan because the project node will grow exponentially in size.

Example:
{noformat}
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df = Seq("a", "b", "c", "1").toDF("text")
df: org.apache.spark.sql.DataFrame = [text: string]

scala> var dfCaseWhen = df.filter($"text" =!= lit("0"))
dfCaseWhen: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [text: 
string]

scala> for( a <- 1 to 5) {
 | dfCaseWhen = dfCaseWhen.withColumn("text", when($"text" === 
lit(a.toString), lit("r" + a.toString)).otherwise($"text"))
 | }

scala> dfCaseWhen.queryExecution.analyzed
res6: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [CASE WHEN (text#12 = 5) THEN r5 ELSE text#12 END AS text#14]
+- Project [CASE WHEN (text#10 = 4) THEN r4 ELSE text#10 END AS text#12]
   +- Project [CASE WHEN (text#8 = 3) THEN r3 ELSE text#8 END AS text#10]
  +- Project [CASE WHEN (text#6 = 2) THEN r2 ELSE text#6 END AS text#8]
 +- Project [CASE WHEN (text#3 = 1) THEN r1 ELSE text#3 END AS text#6]
+- Filter NOT (text#3 = 0)
   +- Project [value#1 AS text#3]
  +- LocalRelation [value#1]

scala> dfCaseWhen.queryExecution.optimizedPlan
res5: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (value#1 = 1) 
THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE 
value#1 END END = 3) THEN r3 ELSE CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 
ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 
END END END = 4) THEN r4 ELSE CASE WHEN (CASE WHEN (CASE WHEN (value#1 = 1) 
THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE 
value#1 END END = 3) THEN r3 ELSE CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 
ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 
END END END END = 5) THEN r5 ELSE CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN 
(value#1 = 1) THEN r1 ELSE va...
{noformat}

As one can see the optimized plan grows exponentially in the number of 
{{when-otherwise}} statements here.

I can see that this comes from the {{CollapseProject}} optimizer rule.
Maybe we should put a limit on the resulting size of the project node after 
collapsing and only collapse if we stay under the limit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24957) Decimal arithmetic can lead to wrong values using codegen

2018-07-29 Thread David Vogelbacher (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561277#comment-16561277
 ] 

David Vogelbacher edited comment on SPARK-24957 at 7/29/18 9:55 PM:


[~mgaido] thanks for putting up the PR!

I wasn't able to reproduce the incorrectness for the specific example I gave 
with wholestage codegen disabled:
{noformat}
scala> spark.conf.set("spark.sql.codegen.wholeStage", false)

scala> import org.apache.spark.sql.functions
import org.apache.spark.sql.functions

scala> val df = Seq(
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("11.88")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("11.88")),
 | ("a", BigDecimal("11.88"))
 | ).toDF("text", "number")
df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)]

scala> val df_grouped_1 = 
df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number"))
df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: 
decimal(38,22)]

scala> df_grouped_1.collect()
res1: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143])

scala> val df_grouped_2 = 
df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number"))
df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: 
decimal(38,22)]

scala> df_grouped_2.collect()
res2: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143])

scala> val df_total_sum = 
df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number"))
df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)]

scala> df_total_sum.collect()
res3: Array[org.apache.spark.sql.Row] = Array([11.94857142857143])
{noformat}


was (Author: dvogelbacher):
[~mgaido] I wasn't able to reproduce the incorrectness for the specific example 
I gave with wholestage codegen disabled, that's what I meant:
{noformat}
scala> spark.conf.set("spark.sql.codegen.wholeStage", false)

scala> import org.apache.spark.sql.functions
import org.apache.spark.sql.functions

scala> val df = Seq(
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("11.88")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("11.88")),
 | ("a", BigDecimal("11.88"))
 | ).toDF("text", "number")
df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)]

scala> val df_grouped_1 = 
df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number"))
df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: 
decimal(38,22)]

scala> df_grouped_1.collect()
res1: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143])

scala> val df_grouped_2 = 
df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number"))
df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: 
decimal(38,22)]

scala> df_grouped_2.collect()
res2: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143])

scala> val df_total_sum = 
df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number"))
df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)]

scala> df_total_sum.collect()
res3: Array[org.apache.spark.sql.Row] = Array([11.94857142857143])
{noformat}

> Decimal arithmetic can lead to wrong values using codegen
> -
>
> Key: SPARK-24957
> URL: https://issues.apache.org/jira/browse/SPARK-24957
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: David Vogelbacher
>Priority: Major
>
> I noticed a bug when doing arithmetic on a dataframe containing decimal 
> values with codegen enabled.
> I tried to narrow it down on a small repro and got this (executed in 
> spark-shell):
> {noformat}
> scala> val df = Seq(
>  | ("a", BigDecimal("12.0")),
>  | ("a", BigDecimal("12.0")),
>  | ("a", BigDecimal("11.88")),
>  | ("a", BigDecimal("12.0")),
>  | ("a", BigDecimal("12.0")),
>  | ("a", BigDecimal("11.88")),
>  | ("a", BigDecimal("11.88"))
>  | ).toDF("text", "number")
> df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)]
> scala> val df_grouped_1 = 
> df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number"))
> df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: 
> decimal(38,22)]
> scala> df_grouped_1.collect()
> res0: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143])
> scala> val df_grouped_2 = 
> 

[jira] [Commented] (SPARK-24957) Decimal arithmetic can lead to wrong values using codegen

2018-07-29 Thread David Vogelbacher (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561277#comment-16561277
 ] 

David Vogelbacher commented on SPARK-24957:
---

[~mgaido] I wasn't able to reproduce the incorrectness for the specific example 
I gave with wholestage codegen disabled, that's what I meant:
{noformat}
scala> spark.conf.set("spark.sql.codegen.wholeStage", false)

scala> import org.apache.spark.sql.functions
import org.apache.spark.sql.functions

scala> val df = Seq(
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("11.88")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("11.88")),
 | ("a", BigDecimal("11.88"))
 | ).toDF("text", "number")
df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)]

scala> val df_grouped_1 = 
df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number"))
df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: 
decimal(38,22)]

scala> df_grouped_1.collect()
res1: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143])

scala> val df_grouped_2 = 
df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number"))
df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: 
decimal(38,22)]

scala> df_grouped_2.collect()
res2: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143])

scala> val df_total_sum = 
df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number"))
df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)]

scala> df_total_sum.collect()
res3: Array[org.apache.spark.sql.Row] = Array([11.94857142857143])
{noformat}

> Decimal arithmetic can lead to wrong values using codegen
> -
>
> Key: SPARK-24957
> URL: https://issues.apache.org/jira/browse/SPARK-24957
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: David Vogelbacher
>Priority: Major
>
> I noticed a bug when doing arithmetic on a dataframe containing decimal 
> values with codegen enabled.
> I tried to narrow it down on a small repro and got this (executed in 
> spark-shell):
> {noformat}
> scala> val df = Seq(
>  | ("a", BigDecimal("12.0")),
>  | ("a", BigDecimal("12.0")),
>  | ("a", BigDecimal("11.88")),
>  | ("a", BigDecimal("12.0")),
>  | ("a", BigDecimal("12.0")),
>  | ("a", BigDecimal("11.88")),
>  | ("a", BigDecimal("11.88"))
>  | ).toDF("text", "number")
> df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)]
> scala> val df_grouped_1 = 
> df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number"))
> df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: 
> decimal(38,22)]
> scala> df_grouped_1.collect()
> res0: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143])
> scala> val df_grouped_2 = 
> df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number"))
> df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: 
> decimal(38,22)]
> scala> df_grouped_2.collect()
> res1: Array[org.apache.spark.sql.Row] = 
> Array([a,11948571.4285714285714285714286])
> scala> val df_total_sum = 
> df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number"))
> df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)]
> scala> df_total_sum.collect()
> res2: Array[org.apache.spark.sql.Row] = Array([11.94857142857143])
> {noformat}
> The results of {{df_grouped_1}} and {{df_total_sum}} are correct, whereas the 
> result of {{df_grouped_2}} is clearly incorrect (it is the value of the 
> correct result times {{10^14}}).
> When codegen is disabled all results are correct. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24957) Decimal arithmetic can lead to wrong values using codegen

2018-07-27 Thread David Vogelbacher (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-24957:
--
Description: 
I noticed a bug when doing arithmetic on a dataframe containing decimal values 
with codegen enabled.
I tried to narrow it down on a small repro and got this (executed in 
spark-shell):
{noformat}
scala> val df = Seq(
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("11.88")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("11.88")),
 | ("a", BigDecimal("11.88"))
 | ).toDF("text", "number")
df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)]

scala> val df_grouped_1 = 
df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number"))
df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: 
decimal(38,22)]

scala> df_grouped_1.collect()
res0: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143])

scala> val df_grouped_2 = 
df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number"))
df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: 
decimal(38,22)]

scala> df_grouped_2.collect()
res1: Array[org.apache.spark.sql.Row] = 
Array([a,11948571.4285714285714285714286])

scala> val df_total_sum = 
df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number"))
df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)]

scala> df_total_sum.collect()
res2: Array[org.apache.spark.sql.Row] = Array([11.94857142857143])
{noformat}

The results of {{df_grouped_1}} and {{df_total_sum}} are correct, whereas the 
result of {{df_grouped_2}} is clearly incorrect (it is the value of the correct 
result times {{10^14}}).

When codegen is disabled all results are correct. 

  was:
I noticed a bug when doing arithmetic on a dataframe containing decimal values 
with codegen enabled.
I tried to narrow it down on a small repro and got this (executed in 
spark-shell):
{noformat}
scala> val df = Seq(
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("11.88")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("11.88")),
 | ("a", BigDecimal("11.88"))
 | ).toDF("text", "number")
df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)]

scala> val df_grouped_1 = 
df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number"))
df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: 
decimal(38,22)]

scala> df_grouped_1.collect()
res0: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143])

scala> val df_grouped_2 = 
df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number"))
df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: 
decimal(38,22)]

scala> df_grouped_2.collect()
res1: Array[org.apache.spark.sql.Row] = 
Array([a,11948571.4285714285714285714286])

scala> val df_total_sum = 
df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number"))
df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)]

scala> df_total_sum.collect()
res2: Array[org.apache.spark.sql.Row] = Array([11.94857142857143])
{noformat}

The results of {{df_grouped_1}} and {{df_total_sum}} are correct, whereas the 
result of {{df_grouped_2}} is clearly incorrect (it is the value of the correct 
result times {{10^14}}).


> Decimal arithmetic can lead to wrong values using codegen
> -
>
> Key: SPARK-24957
> URL: https://issues.apache.org/jira/browse/SPARK-24957
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: David Vogelbacher
>Priority: Major
>
> I noticed a bug when doing arithmetic on a dataframe containing decimal 
> values with codegen enabled.
> I tried to narrow it down on a small repro and got this (executed in 
> spark-shell):
> {noformat}
> scala> val df = Seq(
>  | ("a", BigDecimal("12.0")),
>  | ("a", BigDecimal("12.0")),
>  | ("a", BigDecimal("11.88")),
>  | ("a", BigDecimal("12.0")),
>  | ("a", BigDecimal("12.0")),
>  | ("a", BigDecimal("11.88")),
>  | ("a", BigDecimal("11.88"))
>  | ).toDF("text", "number")
> df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)]
> scala> val df_grouped_1 = 
> df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number"))
> df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: 
> decimal(38,22)]
> scala> df_grouped_1.collect()
> res0: 

[jira] [Created] (SPARK-24957) Decimal arithmetic can lead to wrong values using codegen

2018-07-27 Thread David Vogelbacher (JIRA)
David Vogelbacher created SPARK-24957:
-

 Summary: Decimal arithmetic can lead to wrong values using codegen
 Key: SPARK-24957
 URL: https://issues.apache.org/jira/browse/SPARK-24957
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.1
Reporter: David Vogelbacher


I noticed a bug when doing arithmetic on a dataframe containing decimal values 
with codegen enabled.
I tried to narrow it down on a small repro and got this (executed in 
spark-shell):
{noformat}
scala> val df = Seq(
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("11.88")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("12.0")),
 | ("a", BigDecimal("11.88")),
 | ("a", BigDecimal("11.88"))
 | ).toDF("text", "number")
df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)]

scala> val df_grouped_1 = 
df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number"))
df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: 
decimal(38,22)]

scala> df_grouped_1.collect()
res0: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143])

scala> val df_grouped_2 = 
df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number"))
df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: 
decimal(38,22)]

scala> df_grouped_2.collect()
res1: Array[org.apache.spark.sql.Row] = 
Array([a,11948571.4285714285714285714286])

scala> val df_total_sum = 
df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number"))
df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)]

scala> df_total_sum.collect()
res2: Array[org.apache.spark.sql.Row] = Array([11.94857142857143])
{noformat}

The results of {{df_grouped_1}} and {{df_total_sum}} are correct, whereas the 
result of {{df_grouped_2}} is clearly incorrect (it is the value of the correct 
result times {{10^14}}).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24934) Should handle missing upper/lower bounds cases in in-memory partition pruning

2018-07-26 Thread David Vogelbacher (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558360#comment-16558360
 ] 

David Vogelbacher commented on SPARK-24934:
---

Thanks for opening and making the pr [~hyukjin.kwon]!

> Should handle missing upper/lower bounds cases in in-memory partition pruning
> -
>
> Key: SPARK-24934
> URL: https://issues.apache.org/jira/browse/SPARK-24934
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Hyukjin Kwon
>Priority: Major
>
> For example, if array is used (where the lower and upper bounds for its 
> column batch are {{null}})), it looks wrongly filtering all data out:
> {code}
> scala> import org.apache.spark.sql.functions
> import org.apache.spark.sql.functions
> scala> val df = Seq(Array("a", "b"), Array("c", "d")).toDF("arrayCol")
> df: org.apache.spark.sql.DataFrame = [arrayCol: array]
> scala> 
> df.filter(df.col("arrayCol").eqNullSafe(functions.array(functions.lit("a"), 
> functions.lit("b".show()
> ++
> |arrayCol|
> ++
> |  [a, b]|
> ++
> scala> 
> df.cache().filter(df.col("arrayCol").eqNullSafe(functions.array(functions.lit("a"),
>  functions.lit("b".show()
> ++
> |arrayCol|
> ++
> ++
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12911) Cacheing a dataframe causes array comparisons to fail (in filter / where) after 1.6

2018-07-25 Thread David Vogelbacher (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-12911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556470#comment-16556470
 ] 

David Vogelbacher commented on SPARK-12911:
---

Hey [~hyukjin.kwon] [~sdicocco][~a1ray], I just reproduced this on master. I 
executed the following in the spark-shell:
{noformat}
scala> import org.apache.spark.sql.functions
import org.apache.spark.sql.functions

scala> val df = Seq(Array("a", "b"), Array("c", "d")).toDF("arrayCol")
df: org.apache.spark.sql.DataFrame = [arrayCol: array]

scala> 
df.filter(df.col("arrayCol").eqNullSafe(functions.array(functions.lit("a"), 
functions.lit("b".show()
++
|arrayCol|
++
|  [a, b]|
++


scala> 
df.cache().filter(df.col("arrayCol").eqNullSafe(functions.array(functions.lit("a"),
 functions.lit("b".show()
++
|arrayCol|
++
++
{noformat}

This seems to be the same issue?

> Cacheing a dataframe causes array comparisons to fail (in filter / where) 
> after 1.6
> ---
>
> Key: SPARK-12911
> URL: https://issues.apache.org/jira/browse/SPARK-12911
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
> Environment: OSX 10.11.1, Scala 2.11.7, Spark 1.6.0
>Reporter: Jesse English
>Priority: Major
>
> When doing a *where* operation on a dataframe and testing for equality on an 
> array type, after 1.6 no valid comparisons are made if the dataframe has been 
> cached.  If it has not been cached, the results are as expected.
> This appears to be related to the underlying unsafe array data types.
> {code:title=test.scala|borderStyle=solid}
> test("test array comparison") {
> val vectors: Vector[Row] =  Vector(
>   Row.fromTuple("id_1" -> Array(0L, 2L)),
>   Row.fromTuple("id_2" -> Array(0L, 5L)),
>   Row.fromTuple("id_3" -> Array(0L, 9L)),
>   Row.fromTuple("id_4" -> Array(1L, 0L)),
>   Row.fromTuple("id_5" -> Array(1L, 8L)),
>   Row.fromTuple("id_6" -> Array(2L, 4L)),
>   Row.fromTuple("id_7" -> Array(5L, 6L)),
>   Row.fromTuple("id_8" -> Array(6L, 2L)),
>   Row.fromTuple("id_9" -> Array(7L, 0L))
> )
> val data: RDD[Row] = sc.parallelize(vectors, 3)
> val schema = StructType(
>   StructField("id", StringType, false) ::
> StructField("point", DataTypes.createArrayType(LongType, false), 
> false) ::
> Nil
> )
> val sqlContext = new SQLContext(sc)
> val dataframe = sqlContext.createDataFrame(data, schema)
> val targetPoint:Array[Long] = Array(0L,9L)
> //Cacheing is the trigger to cause the error (no cacheing causes no error)
> dataframe.cache()
> //This is the line where it fails
> //java.util.NoSuchElementException: next on empty iterator
> //However we know that there is a valid match
> val targetRow = dataframe.where(dataframe("point") === 
> array(targetPoint.map(value => lit(value)): _*)).first()
> assert(targetRow != null)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23825) [K8s] Spark pods should request memory + memoryOverhead as resources

2018-03-29 Thread David Vogelbacher (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419950#comment-16419950
 ] 

David Vogelbacher commented on SPARK-23825:
---

addressed by https://github.com/apache/spark/pull/20943

> [K8s] Spark pods should request memory + memoryOverhead as resources
> 
>
> Key: SPARK-23825
> URL: https://issues.apache.org/jira/browse/SPARK-23825
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: David Vogelbacher
>Priority: Major
>
> We currently request  {{spark.[driver,executor].memory}} as memory from 
> Kubernetes (e.g., 
> [here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]).
> The limit is set to {{spark.[driver,executor].memory + 
> spark.kubernetes.[driver,executor].memoryOverhead}}.
> This seems to be using Kubernetes wrong. 
> [How Pods with resource limits are 
> run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run],
>  states:
> {noformat}
> If a Container exceeds its memory request, it is likely that its Pod will be 
> evicted whenever the node runs out of memory.
> {noformat}
> Thus, if a the  spark driver/executor uses {{memory + memoryOverhead}} 
> memory, it can be evicted. While an executor might get restarted (but it 
> would still be very bad performance-wise), the driver would be hard to 
> recover.
> I think spark should be able to run with the requested (and, thus, 
> guaranteed) resources from Kubernetes without being in danger of termination 
> without needing to rely on optional available resources.
> Thus, we shoud request {{memory + memoryOverhead}} memory from Kubernetes 
> (and this should also be the limit).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23825) [K8s] Spark pods should request memory + memoryOverhead as resources

2018-03-29 Thread David Vogelbacher (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419857#comment-16419857
 ] 

David Vogelbacher commented on SPARK-23825:
---

Will make a PR shortly, cc [~mcheah]

> [K8s] Spark pods should request memory + memoryOverhead as resources
> 
>
> Key: SPARK-23825
> URL: https://issues.apache.org/jira/browse/SPARK-23825
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: David Vogelbacher
>Priority: Major
>
> We currently request  {{spark.[driver,executor].memory}} as memory from 
> Kubernetes (e.g., 
> [here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]).
> The limit is set to {{spark.[driver,executor].memory + 
> spark.kubernetes.[driver,executor].memoryOverhead}}.
> This seems to be using Kubernetes wrong. 
> [How Pods with resource limits are 
> run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run],
>  states:
> {noformat}
> If a Container exceeds its memory request, it is likely that its Pod will be 
> evicted whenever the node runs out of memory.
> {noformat}
> Thus, if a the  spark driver/executor uses {{memory + memoryOverhead}} 
> memory, it can be evicted. While an executor might get restarted (but it 
> would still be very bad performance-wise), the driver would be hard to 
> recover.
> I think spark should be able to run with the requested (and, thus, 
> guaranteed) resources from Kubernetes without being in danger of termination 
> without needing to rely on optional available resources.
> Thus, we shoud request {{memory + memoryOverhead}} memory from Kubernetes 
> (and this should also be the limit).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23825) [K8s] Spark pods should request memory + memoryOverhead as resources

2018-03-29 Thread David Vogelbacher (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-23825:
--
Description: 
We currently request  {{spark.[driver,executor].memory}} as memory from 
Kubernetes (e.g., 
[here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]).
The limit is set to {{spark.[driver,executor].memory + 
spark.kubernetes.[driver,executor].memoryOverhead}}.
This seems to be using Kubernetes wrong. 
[How Pods with resource limits are 
run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run],
 states:

{noformat}
If a Container exceeds its memory request, it is likely that its Pod will be 
evicted whenever the node runs out of memory.
{noformat}
Thus, if a the  spark driver/executor uses {{memory + memoryOverhead}} memory, 
it can be evicted. While an executor might get restarted (but it would still be 
very bad performance-wise), the driver would be hard to recover.

I think spark should be able to run with the requested (and, thus, guaranteed) 
resources from Kubernetes without being in danger of termination without 
needing to rely on optional available resources.

Thus, we shoud request {{memory + memoryOverhead}} memory from Kubernetes (and 
this should also be the limit).

  was:
We currently request  {{spark.{driver,executor}.memory}} as memory from 
Kubernetes (e.g., 
[here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]).
The limit is set to {{spark.{driver,executor}.memory + 
spark.kubernetes.{driver,executor}.memoryOverhead}}.
This seems to be using Kubernetes wrong. 
[How Pods with resource limits are 
run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run],
 states"

{noformat}
If a Container exceeds its memory request, it is likely that its Pod will be 
evicted whenever the node runs out of memory.
{noformat}

Thus, if a the  spark driver/executor uses {{memory + memoryOverhead}} memory, 
it can be evicted. While an executor might get restarted (but it would still be 
very bad performance-wise), the driver would be hard to recover.

I think spark should be able to run with the requested (and, thus, guaranteed) 
resources from Kubernetes. It shouldn't rely on optional resources above the 
request and, therefore, be in danger of termination on high cluster utilization.

Thus, we shoud request {{memory + memoryOverhead}} memory from Kubernetes (and 
this should also be the limit).


> [K8s] Spark pods should request memory + memoryOverhead as resources
> 
>
> Key: SPARK-23825
> URL: https://issues.apache.org/jira/browse/SPARK-23825
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: David Vogelbacher
>Priority: Major
>
> We currently request  {{spark.[driver,executor].memory}} as memory from 
> Kubernetes (e.g., 
> [here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]).
> The limit is set to {{spark.[driver,executor].memory + 
> spark.kubernetes.[driver,executor].memoryOverhead}}.
> This seems to be using Kubernetes wrong. 
> [How Pods with resource limits are 
> run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run],
>  states:
> {noformat}
> If a Container exceeds its memory request, it is likely that its Pod will be 
> evicted whenever the node runs out of memory.
> {noformat}
> Thus, if a the  spark driver/executor uses {{memory + memoryOverhead}} 
> memory, it can be evicted. While an executor might get restarted (but it 
> would still be very bad performance-wise), the driver would be hard to 
> recover.
> I think spark should be able to run with the requested (and, thus, 
> guaranteed) resources from Kubernetes without being in danger of termination 
> without needing to rely on optional available resources.
> Thus, we shoud request {{memory + memoryOverhead}} memory from Kubernetes 
> (and this should also be the limit).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23825) [K8s] Spark pods should request memory + memoryOverhead as resources

2018-03-29 Thread David Vogelbacher (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-23825:
--
Description: 
We currently request  {{spark.{driver,executor}.memory}} as memory from 
Kubernetes (e.g., 
[here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]).
The limit is set to {{spark.{driver,executor}.memory + 
spark.kubernetes.{driver,executor}.memoryOverhead}}.
This seems to be using Kubernetes wrong. 
[How Pods with resource limits are 
run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run],
 states"

{noformat}
If a Container exceeds its memory request, it is likely that its Pod will be 
evicted whenever the node runs out of memory.
{noformat}

Thus, if a the  spark driver/executor uses {{memory + memoryOverhead}} memory, 
it can be evicted. While an executor might get restarted (but it would still be 
very bad performance-wise), the driver would be hard to recover.

I think spark should be able to run with the requested (and, thus, guaranteed) 
resources from Kubernetes. It shouldn't rely on optional resources above the 
request and, therefore, be in danger of termination on high cluster utilization.

Thus, we shoud request {{memory + memoryOverhead}} memory from Kubernetes (and 
this should also be the limit).

  was:
We currently request `spark.{driver,executor}.memory` as memory from Kubernetes 
(e.g., 
[here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]).
The limit is set to `spark.{driver,executor}.memory + 
spark.kubernetes.{driver,executor}.memoryOverhead`.
This seems to be using Kubernetes wrong. 
[How Pods with resource limits are 
run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run],
 states"

{noformat}
If a Container exceeds its memory request, it is likely that its Pod will be 
evicted whenever the node runs out of memory.
{noformat}

Thus, if a the  spark driver/executor uses `memory + memoryOverhead` memory, it 
can be evicted. While an executor might get restarted (but it would still be 
very bad performance-wise), the driver would be hard to recover.

I think spark should be able to run with the requested (and, thus, guaranteed) 
resources from Kubernetes. It shouldn't rely on optional resources above the 
request and, therefore, be in danger of termination on high cluster utilization.

Thus, we shoud request `memory + memoryOverhead` memory from Kubernetes (and 
this should also be the limit).


> [K8s] Spark pods should request memory + memoryOverhead as resources
> 
>
> Key: SPARK-23825
> URL: https://issues.apache.org/jira/browse/SPARK-23825
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: David Vogelbacher
>Priority: Major
>
> We currently request  {{spark.{driver,executor}.memory}} as memory from 
> Kubernetes (e.g., 
> [here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]).
> The limit is set to {{spark.{driver,executor}.memory + 
> spark.kubernetes.{driver,executor}.memoryOverhead}}.
> This seems to be using Kubernetes wrong. 
> [How Pods with resource limits are 
> run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run],
>  states"
> {noformat}
> If a Container exceeds its memory request, it is likely that its Pod will be 
> evicted whenever the node runs out of memory.
> {noformat}
> Thus, if a the  spark driver/executor uses {{memory + memoryOverhead}} 
> memory, it can be evicted. While an executor might get restarted (but it 
> would still be very bad performance-wise), the driver would be hard to 
> recover.
> I think spark should be able to run with the requested (and, thus, 
> guaranteed) resources from Kubernetes. It shouldn't rely on optional 
> resources above the request and, therefore, be in danger of termination on 
> high cluster utilization.
> Thus, we shoud request {{memory + memoryOverhead}} memory from Kubernetes 
> (and this should also be the limit).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23825) [K8s] Spark pods should request memory + memoryOverhead as resources

2018-03-29 Thread David Vogelbacher (JIRA)
David Vogelbacher created SPARK-23825:
-

 Summary: [K8s] Spark pods should request memory + memoryOverhead 
as resources
 Key: SPARK-23825
 URL: https://issues.apache.org/jira/browse/SPARK-23825
 Project: Spark
  Issue Type: Bug
  Components: Kubernetes
Affects Versions: 2.3.0
Reporter: David Vogelbacher


We currently request `spark.{driver,executor}.memory` as memory from Kubernetes 
(e.g., 
[here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]).
The limit is set to `spark.{driver,executor}.memory + 
spark.kubernetes.{driver,executor}.memoryOverhead`.
This seems to be using Kubernetes wrong. 
[How Pods with resource limits are 
run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run],
 states"

{noformat}
If a Container exceeds its memory request, it is likely that its Pod will be 
evicted whenever the node runs out of memory.
{noformat}

Thus, if a the  spark driver/executor uses `memory + memoryOverhead` memory, it 
can be evicted. While an executor might get restarted (but it would still be 
very bad performance-wise), the driver would be hard to recover.

I think spark should be able to run with the requested (and, thus, guaranteed) 
resources from Kubernetes. It shouldn't rely on optional resources above the 
request and, therefore, be in danger of termination on high cluster utilization.

Thus, we shoud request `memory + memoryOverhead` memory from Kubernetes (and 
this should also be the limit).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec

2018-03-08 Thread David Vogelbacher (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391709#comment-16391709
 ] 

David Vogelbacher edited comment on SPARK-23598 at 3/8/18 6:41 PM:
---

[~mgaido] {{HashAggregateExec}} calls {{addNewFunction}}, which calls 
{{addNewFunctionInternal}} which uses that flag and checks if the current size 
is bigger than {{GENERATED_CLASS_SIZE_THRESHOLD}} 
([see|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L478])

I just compiled develop with {{GENERATED_CLASS_SIZE_THRESHOLD}} set to -1 and 
was able to reproduce (cc [~hvanhovell]) . I applied the following diff before 
compiling:

{noformat}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 793824b0b0..7fad817d89 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -1167,7 +1167,7 @@ object CodeGenerator extends Logging {
   // limit, 65,536. We cannot know how many constants will be inserted for a 
class, so we use a
   // threshold of 1000k bytes to determine when a function should be inlined 
to a private, inner
   // class.
-  final val GENERATED_CLASS_SIZE_THRESHOLD = 100
+  final val GENERATED_CLASS_SIZE_THRESHOLD = -1

   // This is the threshold for the number of global variables, whose types are 
primitive type or
   // complex type (e.g. more than one-dimensional array), that will be placed 
at the outer class
(END)
{noformat}

Then, I executed a simple groupBy-Aggregate in the spark-shell and got the same 
error:

{noformat}
➜  spark git:(master) ✗ ./bin/spark-shell
18/03/08 18:30:24 WARN Utils: Your hostname, dvogelbac resolves to a loopback 
address: 127.0.0.1; using 10.111.11.111 instead (on interface en0)
18/03/08 18:30:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another 
address
18/03/08 18:30:24 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
Spark context Web UI available at http://10.111.11.111:4040
Spark context available as 'sc' (master = local[*], app id = 
local-1520533829643).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0-SNAPSHOT
  /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.conf.set("spark.sql.codegen.wholeStage", true)

scala> val df_pet_age = Seq(
 |   (8, "bat"),
 |   (5, "bat"),
 |   (15, "bat"),
 |   (30, "mouse"),
 |   (15, "mouse"),
 |   (23, "mouse"),
 |   (8, "horse"),
 |   (-5, "horse")
 | ).toDF("age", "name")
df_pet_age: org.apache.spark.sql.DataFrame = [age: int, name: string]

scala> df_pet_age.groupBy("name").avg("age").show()
18/03/08 18:31:20 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.stopEarly()Z from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:616)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 

[jira] [Commented] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec

2018-03-08 Thread David Vogelbacher (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391709#comment-16391709
 ] 

David Vogelbacher commented on SPARK-23598:
---

[~mgaido] {{HashAggregateExec}} calls {{addNewFunction}}, which calls 
{{addNewFunctionInternal}} which uses that flag and checks if the current size 
is bigger than {{GENERATED_CLASS_SIZE_THRESHOLD}} 
([see|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L478])

I just compiled develop with {{GENERATED_CLASS_SIZE_THRESHOLD}} set to -1 and 
was able to reproduce (cc [~hvanhovell]) . I applied the following diff before 
compiling:

{noformat}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 793824b0b0..7fad817d89 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -1167,7 +1167,7 @@ object CodeGenerator extends Logging {
   // limit, 65,536. We cannot know how many constants will be inserted for a 
class, so we use a
   // threshold of 1000k bytes to determine when a function should be inlined 
to a private, inner
   // class.
-  final val GENERATED_CLASS_SIZE_THRESHOLD = 100
+  final val GENERATED_CLASS_SIZE_THRESHOLD = -1

   // This is the threshold for the number of global variables, whose types are 
primitive type or
   // complex type (e.g. more than one-dimensional array), that will be placed 
at the outer class
(END)
{noformat}

Then, I executed a simple groupBy-Aggregate in the spark-shell and got the same 
error:

{noformat}
➜  spark git:(master) ✗ ./bin/spark-shell
18/03/08 18:30:24 WARN Utils: Your hostname, dvogelbac56-mac resolves to a 
loopback address: 127.0.0.1; using 10.224.86.161 instead (on interface en0)
18/03/08 18:30:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another 
address
18/03/08 18:30:24 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
Spark context Web UI available at http://10.224.86.161:4040
Spark context available as 'sc' (master = local[*], app id = 
local-1520533829643).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0-SNAPSHOT
  /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.conf.set("spark.sql.codegen.wholeStage", true)

scala> val df_pet_age = Seq(
 |   (8, "bat"),
 |   (5, "bat"),
 |   (15, "bat"),
 |   (30, "mouse"),
 |   (15, "mouse"),
 |   (23, "mouse"),
 |   (8, "horse"),
 |   (-5, "horse")
 | ).toDF("age", "name")
df_pet_age: org.apache.spark.sql.DataFrame = [age: int, name: string]

scala> df_pet_age.groupBy("name").avg("age").show()
18/03/08 18:31:20 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.stopEarly()Z from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:616)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 

[jira] [Commented] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec

2018-03-07 Thread David Vogelbacher (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389745#comment-16389745
 ] 

David Vogelbacher commented on SPARK-23598:
---

[~hvanhovell] unfortunately, I can't extract any code for reproducing.
It should be possible to come up with code, by making a large enough query (one 
that adds many methods in the code gen stage) and contains a HashAggregateExec 
node.

Or to make it even easier, one could compile spark with a lowered `final val 
GENERATED_CLASS_SIZE_THRESHOLD` in [CodeGenerator 
title|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L1170],
 to force the generation of the private inner class.

> WholeStageCodegen can lead to IllegalAccessError  calling append for 
> HashAggregateExec
> --
>
> Key: SPARK-23598
> URL: https://issues.apache.org/jira/browse/SPARK-23598
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: David Vogelbacher
>Priority: Major
>
> Got the following stacktrace for a large QueryPlan using WholeStageCodeGen:
> {noformat}
> java.lang.IllegalAccessError: tried to access method 
> org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V
>  from class 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat}
> After disabling codegen, everything works.
> The root cause seems to be that we are trying to call the protected _append_ 
> method of 
> [BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68]
>  from an inner-class of a sub-class that is loaded by a different 
> class-loader (after codegen compilation).
> [https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] 
> states that a protected method _R_ can be accessed only if one of the 
> following two conditions is fulfilled:
>  # R is protected and is declared in a class C, and D is either a subclass of 
> C or C itself. Furthermore, if R is not static, then the symbolic reference 
> to R must contain a symbolic reference to a class T, such that T is either a 
> subclass of D, a superclass of D, or D itself.
>  # R is either protected or has default access (that is, neither public nor 
> protected nor private), and is declared by a class in the same run-time 
> package as D.
> 2.) doesn't apply as we have loaded the class with a different class loader 
> (and are in a different package) and 1.) doesn't apply because we are 
> apparently trying to call the method from an inner class of a subclass of 
> _BufferedRowIterator_.
> Looking at the Code path of _WholeStageCodeGen_, the following happens:
>  # In 
> [WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527],
>  we create the subclass of _BufferedRowIterator_, along with a _processNext_ 
> method for processing the output of the child plan.
>  # In the child, which is a 
> [HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517],
>  we create the method which shows up at the top of the stack trace (called 
> _doAggregateWithKeysOutput_ )
>  # We add this method to the compiled code invoking _addNewFunction_ of 
> [CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460]
> In the generated function body we call the _append_ method.|
> Now, the _addNewFunction_ 

[jira] [Updated] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec

2018-03-05 Thread David Vogelbacher (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-23598:
--
Description: 
Got the following stacktrace for a large QueryPlan using WholeStageCodeGen:
{noformat}
java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V
 from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat}
After disabling codegen, everything works.

The root cause seems to be that we are trying to call the protected _append_ 
method of 
[BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68]
 from an inner-class of a sub-class that is loaded by a different class-loader 
(after codegen compilation).

[https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] 
states that a protected method _R_ can be accessed only if one of the following 
two conditions is fulfilled:
 # R is protected and is declared in a class C, and D is either a subclass of C 
or C itself. Furthermore, if R is not static, then the symbolic reference to R 
must contain a symbolic reference to a class T, such that T is either a 
subclass of D, a superclass of D, or D itself.
 # R is either protected or has default access (that is, neither public nor 
protected nor private), and is declared by a class in the same run-time package 
as D.

2.) doesn't apply as we have loaded the class with a different class loader 
(and are in a different package) and 1.) doesn't apply because we are 
apparently trying to call the method from an inner class of a subclass of 
_BufferedRowIterator_.

Looking at the Code path of _WholeStageCodeGen_, the following happens:
 # In 
[WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527],
 we create the subclass of _BufferedRowIterator_, along with a _processNext_ 
method for processing the output of the child plan.
 # In the child, which is a 
[HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517],
 we create the method which shows up at the top of the stack trace (called 
_doAggregateWithKeysOutput_ )
 # We add this method to the compiled code invoking _addNewFunction_ of 
[CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460]
In the generated function body we call the _append_ method.|

Now, the _addNewFunction_ method states that:
{noformat}
If the code for the `OuterClass` grows too large, the function will be inlined 
into a new private, inner class
{noformat}
This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put into 
a new private inner class. Thus, it doesn't have access to the protected 
_append_ method anymore but still tries to call it, which results in the 
_IllegalAccessError._ 

Possible fixes:
 * Pass in the _inlineToOuterClass_ flag when invoking the _addNewFunction_
 * Make the _append_ method public
 * Re-declare the _append_ method in the generated subclass (just invoking 
_super_). This way, inner classes should have access to it.

 

  was:
Got the following stacktrace for a large QueryPlan using WholeStageCodeGen:
{noformat}
java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V
 from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass
at 

[jira] [Updated] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec

2018-03-05 Thread David Vogelbacher (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-23598:
--
Description: 
Got the following stacktrace for a large QueryPlan using WholeStageCodeGen:
{noformat}
java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V
 from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat}
After disabling codegen, everything works.

The root cause seems to be that we are trying to call the protected _append_ 
method of 
[BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68]
 from an inner-class of a sub-class that is loaded by a different class-loader 
(after codegen compilation).

[https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] 
states that a protected method _R_ can be accessed only if one of the following 
two conditions is fulfilled:
 # R is protected and is declared in a class C, and D is either a subclass of C 
or C itself. Furthermore, if R is not static, then the symbolic reference to R 
must contain a symbolic reference to a class T, such that T is either a 
subclass of D, a superclass of D, or D itself.
 # R is either protected or has default access (that is, neither public nor 
protected nor private), and is declared by a class in the same run-time package 
as D.

2.) doesn't apply as we have loaded the class with a different class loader 
(and are in a different package) and 1.) doesn't apply because we are 
apparently trying to call the method from an inner class of a subclass of 
_BufferedRowIterator_.

Looking at the Code path of _WholeStageCodeGen_, the following happens:
 # In 
[WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527],
 we create the subclass of _BufferedRowIterator_, along with a _processNext_ 
method for processing the output of the child plan.
 # In the child, which is a 
[HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517],
 we create the method which shows up at the top of the stack trace (called 
_doAggregateWithKeysOutput_ )
 # We add this method to the compiled code invoking _addNewFunction_ of 
[CodeGenerator
|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460]In
 the generated function body we call the _append_ method.

Now, the _addNewFunction_ method states that:
{noformat}
If the code for the `OuterClass` grows too large, the function will be inlined 
into a new private, inner class
{noformat}
This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put into 
a new private inner class. Thus, it doesn't have access to the protected 
_append_ method anymore but still tries to call it, which results in the 
_IllegalAccessError._ 

Possible fixes:
 * Pass in the _inlineToOuterClass_ flag when invoking the _addNewFunction_
 * Make the _append_ method public
 * Re-declare the _append_ method in the generated subclass (just invoking 
_super_). This way, inner classes should have access to it.

 

  was:
Got the following stacktrace for a large QueryPlan using WholeStageCodeGen:
{noformat}
java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V
 from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass
at 

[jira] [Updated] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec

2018-03-05 Thread David Vogelbacher (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-23598:
--
Description: 
Got the following stacktrace for a large QueryPlan using WholeStageCodeGen:
{noformat}
java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V
 from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat}
After disabling codegen, everything works.

The root cause seems to be that we are trying to call the protected _append_ 
method of 
[BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68]
 from an inner-class of a sub-class that is loaded by a different class-loader 
(after codegen compilation).

[https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] 
states that a protected method _R_ can be accessed only if one of the following 
two conditions is fulfilled:
 # R is protected and is declared in a class C, and D is either a subclass of C 
or C itself. Furthermore, if R is not static, then the symbolic reference to R 
must contain a symbolic reference to a class T, such that T is either a 
subclass of D, a superclass of D, or D itself.
 # R is either protected or has default access (that is, neither public nor 
protected nor private), and is declared by a class in the same run-time package 
as D.

2.) doesn't apply as we have loaded the class with a different class loader 
(and are in a different package) and 1.) doesn't apply because we are 
apparently trying to call the method from an inner class of a subclass of 
_BufferedRowIterator_.

Looking at the Code path of _WholeStageCodeGen_, the following happens:
 # In 
[WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527],
 we create the subclass of _BufferedRowIterator_, along with a _processNext_ 
method for processing the output of the child plan.
 # In the child, which is a 
[HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517],
 we create the method which shows up at the top of the stack trace (called 
_doAggregateWithKeysOutput_ )
 # We add this method to the compiled code invoking _addNewFunction_ of 
[CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460]In
 the generated function body we call the _append_ method.|

Now, the _addNewFunction_ method states that:
{noformat}
If the code for the `OuterClass` grows too large, the function will be inlined 
into a new private, inner class
{noformat}
This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put into 
a new private inner class. Thus, it doesn't have access to the protected 
_append_ method anymore but still tries to call it, which results in the 
_IllegalAccessError._ 

Possible fixes:
 * Pass in the _inlineToOuterClass_ flag when invoking the _addNewFunction_
 * Make the _append_ method public
 * Re-declare the _append_ method in the generated subclass (just invoking 
_super_). This way, inner classes should have access to it.

 

  was:
Got the following stacktrace for a large QueryPlan using WholeStageCodeGen:
{noformat}
java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V
 from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass
at 

[jira] [Updated] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec

2018-03-05 Thread David Vogelbacher (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-23598:
--
Description: 
Got the following stacktrace for a large QueryPlan using WholeStageCodeGen:
{noformat}
java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V
 from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat}
After disabling codegen, everything works.

The root cause seems to be that we are trying to call the protected _append_ 
method of 
[BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68]
 from an inner-class of a sub-class that is loaded by a different class-loader 
(after codegen compilation).

[https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] 
states that a protected method _R_ can be accessed only if one of the following 
two conditions is fulfilled:
 # R is protected and is declared in a class C, and D is either a subclass of C 
or C itself. Furthermore, if R is not static, then the symbolic reference to R 
must contain a symbolic reference to a class T, such that T is either a 
subclass of D, a superclass of D, or D itself.
 # R is either protected or has default access (that is, neither public nor 
protected nor private), and is declared by a class in the same run-time package 
as D.

2.) doesn't apply as we have loaded the class with a different class loader 
(and are in a different package) and 1.) doesn't apply because we are 
apparently trying to call the method from an inner class of a subclass of 
_BufferedRowIterator_.

Looking at the Code path of _WholeStageCodeGen_, the following happens:
 # In 
[WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527],
 we create the subclass of _BufferedRowIterator_, along with a _processNext_ 
method for processing the output of the child plan.
 # In the child, which is a 
[HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517],
 we create the method which shows up at the top of the stack trace (called 
_doAggregateWithKeysOutput_ )
 # We add this method to the compiled code invoking _addNewFunction_ of 
[CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460]

Now, this method states that:
{noformat}
If the code for the `OuterClass` grows too large, the function will be inlined 
into a new private, inner class
{noformat}
This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put into 
a new private inner class. Thus, it doesn't have access to the protected 
_append_ method anymore but still tries to call it, which results in the 
_IllegalAccessError._ 

Possible fixes:
 * Pass in the _inlineToOuterClass_ flag when invoking the _addNewFunction_
 * Make the _append_ method public
 * Re-declare the _append_ method in the generated subclass (just invoking 
_super_). This way, inner classes should have access to it.

 

  was:
Got the following stacktrace for a large QueryPlan using WholeStageCodeGen:
{noformat}
java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V
 from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown
 Source)
at 

[jira] [Updated] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec

2018-03-05 Thread David Vogelbacher (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-23598:
--
Description: 
Got the following stacktrace for a large QueryPlan using WholeStageCodeGen:
{noformat}
java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V
 from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat}
After disabling codegen, everything works.

The root cause seems to be that we are trying to call the protected _append_ 
method of 
[BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68]
 from an inner-class of a sub-class that is loaded by a different class-loader 
(after codegen compilation).

[https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] 
states that a protected method _R_ can be accessed only if one of the following 
two conditions is fulfilled:
 # R is protected and is declared in a class C, and D is either a subclass of C 
or C itself. Furthermore, if R is not static, then the symbolic reference to R 
must contain a symbolic reference to a class T, such that T is either a 
subclass of D, a superclass of D, or D itself.
 # R is either protected or has default access (that is, neither public nor 
protected nor private), and is declared by a class in the same run-time package 
as D.

2.) doesn't apply as we have loaded the class with a different class loader 
(and are in a different package) and 1.) doesn't apply because we are 
apparently trying to call the method from an inner class of a subclass of 
_BufferedRowIterator_.

Looking at the Code path of _WholeStageCodeGen_, the following happens:
 # In 
[WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527],
 we create the subclass of _BufferedRowIterator_, along with a _processNext_ 
method for processing the output of the child plan.
 # In the child, which is a 
[HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517],
 we create the method which shows up at the top of the stack trace (called 
_doAggregateWithKeysOutput_ )
 # We add this method to the compiled code invoking _addNewFunction_ of 
[CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460]

Now, this method states that:
{noformat}
If the code for the `OuterClass` grows too large, the function will be inlined 
into a new private, inner class
{noformat}
This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put into 
a new private inner class. Thus, it doesn't have access to the protected 
_append_ method anymore but still tries to call it, which results in the 
___IllegalAccessError._ 

Possible fixes:
 * Pass in the _inlineToOuterClass_ flag when invoking __ _addNewFunction_
 * Make the _append_ method public
 * Re-declare the _append_ method in the generated subclass (just invoking 
_super_). This way, inner classes should have access to it.

 

  was:
Got the following stacktrace for a large QueryPlan using WholeStageCodeGen:
{noformat}
java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V
 from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown
 Source)
at 

[jira] [Updated] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec

2018-03-05 Thread David Vogelbacher (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Vogelbacher updated SPARK-23598:
--
Description: 
Got the following stacktrace for a large QueryPlan using WholeStageCodeGen:
{noformat}
java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V
 from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat}
After disabling codegen, everything works.

The root cause seems to be that we are trying to call the protected _append_ 
method of 
[BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68]
 __ from an inner-class of a sub-class that is loaded by a different 
class-loader (after codegen compilation).

[https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] 
states that a protected method _R_ can be accessed only if one of the following 
two conditions is fulfilled:
 # R is protected and is declared in a class C, and D is either a subclass of C 
or C itself. Furthermore, if R is not static, then the symbolic reference to R 
must contain a symbolic reference to a class T, such that T is either a 
subclass of D, a superclass of D, or D itself.
 # R is either protected or has default access (that is, neither public nor 
protected nor private), and is declared by a class in the same run-time package 
as D.

2.) doesn't apply as we have loaded the class with a different class loader 
(and are in a different package) and 1.) doesn't apply because we are 
apparently trying to call the method from an inner class of a subclass of 
_BufferedRowIterator_.

Looking at the Code path of _WholeStageCodeGen_, the following happens:
 # In 
[WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527],
 we create the subclass of _BufferedRowIterator_, along with a _processNext_ 
method for processing the output of the child plan.
 # In the child, which is a 
[HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517],
 we create the method which shows up at the top of the stack trace (called 
_doAggregateWithKeysOutput_ )
 # We add this method to the compiled code invoking _addNewFunction_ of 
[CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460]

Now, this method states that:
{noformat}
If the code for the `OuterClass` grows too large, the function will be inlined 
into a new private, inner class
{noformat}
This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put into 
a new private inner class. Thus, it doesn't have access to the protected 
_append_ method anymore but still tries to call it, which results in the 
___IllegalAccessError._ 

Possible fixes:
 * Pass in the _inlineToOuterClass_ flag when invoking __ _addNewFunction_
 * Make the _append_ method public
 * Re-declare the _append_ method in the generated subclass (just invoking 
_super_). This way, inner classes should have access to it.

 

  was:
Got the following stacktrace for a large QueryPlan using WholeStageCodeGen:
{code:java}
java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V
 from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown
 Source)
at 

[jira] [Created] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec

2018-03-05 Thread David Vogelbacher (JIRA)
David Vogelbacher created SPARK-23598:
-

 Summary: WholeStageCodegen can lead to IllegalAccessError  calling 
append for HashAggregateExec
 Key: SPARK-23598
 URL: https://issues.apache.org/jira/browse/SPARK-23598
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: David Vogelbacher


Got the following stacktrace for a large QueryPlan using WholeStageCodeGen:
{code:java}
java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V
 from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
{code}

After disabling codegen, everything works.

The root cause seems to be that we are trying to call the protected _append_ 
method of 
[BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68]
 __ from an inner-class of a sub-class that is loaded by a different 
class-loader (after codegen compilation).

[https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] 
states that a protected method _R_ can be accessed only if one of the following 
two conditions is fulfilled:
 # R is protected and is declared in a class C, and D is either a subclass of C 
or C itself. Furthermore, if R is not static, then the symbolic reference to R 
must contain a symbolic reference to a class T, such that T is either a 
subclass of D, a superclass of D, or D itself.
 # R is either protected or has default access (that is, neither public nor 
protected nor private), and is declared by a class in the same run-time package 
as D.

2.) doesn't apply as we have loaded the class with a different class loader 
(and are in a different package) and 1.) doesn't apply because we are 
apparently trying to call the method from an inner class of a subclass of 
_BufferedRowIterator_.



Looking at the Code path of _WholeStageCodeGen_, the following happens:
 # In 
[WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527],
 we create the subclass of _BufferedRowIterator_, along with a _processNext_ 
method for processing the output of the child plan.
 # In the child, which is a 
[HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517],
 we create the method which shows up at the top of the stack trace (called 
_doAggregateWithKeysOutput_ )
 # We add this method to the compiled code invoking _addNewFunction_ of 
[CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460]

Now, this method states that:
{noformat}
If the code for the `OuterClass` grows too large, the function will be inlined 
into a new private, inner class
{noformat}
This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put into 
a new private inner class. Thus, it doesn't have access to the protected 
_append_ method anymore but still tries to call it, which results in the 
___IllegalAccessError._ 

Possible fixes:
 * Pass in the _inlineToOuterClass_ flag when invoking __ _addNewFunction_
 * Make the _append_ method public
 * Re-declare the _append_ method in the generated subclass (just invoking 
_super_). This way, inner classes should have access to it.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org