[jira] [Updated] (SPARK-39885) Behavior differs between arrays_overlap and array_contains for negative 0.0
[ https://issues.apache.org/jira/browse/SPARK-39885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-39885: -- Summary: Behavior differs between arrays_overlap and array_contains for negative 0.0 (was: Behavior differs between array_overlap and array_contains for negative 0.0) > Behavior differs between arrays_overlap and array_contains for negative 0.0 > --- > > Key: SPARK-39885 > URL: https://issues.apache.org/jira/browse/SPARK-39885 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.2 >Reporter: David Vogelbacher >Priority: Major > > {{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], > [-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 > as the same (see > https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28) > However, the {{Double::equals}} method doesn't. Therefore, we should either > mark double as false in > [TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala#L96], > or we should wrap it with our own equals method that handles this case. > Java code snippets showing the issue: > {code:java} > dataset = sparkSession.createDataFrame( > List.of(RowFactory.create(List.of(-0.0))), > > DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField( > "doubleCol", > DataTypes.createArrayType(DataTypes.DoubleType), false; > Dataset df = dataset.withColumn( > "overlaps", > functions.arrays_overlap(functions.array(functions.lit(+0.0)), > dataset.col("doubleCol"))); > List result = df.collectAsList(); // [[WrappedArray(-0.0),false]] > {code} > {code:java} > dataset = sparkSession.createDataFrame( > List.of(RowFactory.create(-0.0)), > DataTypes.createStructType( > > ImmutableList.of(DataTypes.createStructField("doubleCol", > DataTypes.DoubleType, false; > Dataset df = dataset.withColumn( > "contains", > functions.array_contains(functions.array(functions.lit(+0.0)), > dataset.col("doubleCol"))); > List result = df.collectAsList(); // [[-0.0,true]] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39885) Behavior differs between array_overlap and array_contains for negative 0.0
[ https://issues.apache.org/jira/browse/SPARK-39885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-39885: -- Description: {{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], [-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 as the same (see https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28) However, the {{Double::equals}} method doesn't. Therefore, we should either mark double as false in [TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala#L96], or we should wrap it with our own equals method that handles this case. Java code snippets showing the issue: {code:java} dataset = sparkSession.createDataFrame( List.of(RowFactory.create(List.of(-0.0))), DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField( "doubleCol", DataTypes.createArrayType(DataTypes.DoubleType), false; Dataset df = dataset.withColumn( "overlaps", functions.arrays_overlap(functions.array(functions.lit(+0.0)), dataset.col("doubleCol"))); List result = df.collectAsList(); // [[WrappedArray(-0.0),false]] {code} {code:java} dataset = sparkSession.createDataFrame( List.of(RowFactory.create(-0.0)), DataTypes.createStructType( ImmutableList.of(DataTypes.createStructField("doubleCol", DataTypes.DoubleType, false; Dataset df = dataset.withColumn( "contains", functions.array_contains(functions.array(functions.lit(+0.0)), dataset.col("doubleCol"))); List result = df.collectAsList(); // [[-0.0,true]] {code} was: {{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], [-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 as the same (see https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28) However, the {{Double::equals}} method doesn't. Therefore, we should either mark double as false in [TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala#L96], or we should wrap it with our own equals method that handles this case. Java code snippets showing the issue: {code:java} dataset = sparkSession.createDataFrame( List.of(RowFactory.create(List.of(-0.0))), DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField( "doubleCol", DataTypes.createArrayType(DataTypes.DoubleType), false; Dataset df = dataset.withColumn( "overlaps", functions.arrays_overlap(functions.array(functions.lit(+0.0)), dataset.col("doubleCol"))); List result = df.collectAsList(); // [[WrappedArray(-0.0),false]] {code} {code:java} dataset = sparkSession.createDataFrame( List.of(RowFactory.create(-0.0)), DataTypes.createStructType( ImmutableList.of(DataTypes.createStructField("doubleCol", DataTypes.DoubleType, false; Dataset df = dataset.withColumn( "overlaps", functions.array_contains(functions.array(functions.lit(+0.0)), dataset.col("doubleCol"))); List result = df.collectAsList(); // [[-0.0,true]] {code} > Behavior differs between array_overlap and array_contains for negative 0.0 > -- > > Key: SPARK-39885 > URL: https://issues.apache.org/jira/browse/SPARK-39885 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.2 >Reporter: David Vogelbacher >Priority: Major > > {{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], > [-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 > as the same (see > https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28) > However, the {{Double::equals}} method doesn't. Therefore, we should either > mark double as false in > [TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala#L96], > or we should wrap it with our own equals method that handles this case. > Java code snippets showing the issue: >
[jira] [Updated] (SPARK-39885) Behavior differs between array_overlap and array_contains for negative 0.0
[ https://issues.apache.org/jira/browse/SPARK-39885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-39885: -- Description: {{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], [-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 as the same (see https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28) However, the {{Double::equals}} method doesn't. Therefore, we should either mark double as false in [TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala#L96], or we should wrap it with our own equals method that handles this case. Java code snippets showing the issue: {code:java} dataset = sparkSession.createDataFrame( List.of(RowFactory.create(List.of(-0.0))), DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField( "doubleCol", DataTypes.createArrayType(DataTypes.DoubleType), false; Dataset df = dataset.withColumn( "overlaps", functions.arrays_overlap(functions.array(functions.lit(+0.0)), dataset.col("doubleCol"))); List result = df.collectAsList(); // [[WrappedArray(-0.0),false]] {code} {code:java} dataset = sparkSession.createDataFrame( List.of(RowFactory.create(-0.0)), DataTypes.createStructType( ImmutableList.of(DataTypes.createStructField("doubleCol", DataTypes.DoubleType, false; Dataset df = dataset.withColumn( "overlaps", functions.array_contains(functions.array(functions.lit(+0.0)), dataset.col("doubleCol"))); List result = df.collectAsList(); // [[-0.0,true]] {code} was: {{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], [-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 as the same (see https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28) However, the {{Double::equals}} method doesn't. Therefore, we should either mark double as false in [TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala], or we should wrap it with our own equals method that handles this case. Java code snippets showing the issue: {code:java} dataset = sparkSession.createDataFrame( List.of(RowFactory.create(List.of(-0.0))), DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField( "doubleCol", DataTypes.createArrayType(DataTypes.DoubleType), false; Dataset df = dataset.withColumn( "overlaps", functions.arrays_overlap(functions.array(functions.lit(+0.0)), dataset.col("doubleCol"))); List result = df.collectAsList(); // [[WrappedArray(-0.0),false]] {code} {code:java} dataset = sparkSession.createDataFrame( List.of(RowFactory.create(-0.0)), DataTypes.createStructType( ImmutableList.of(DataTypes.createStructField("doubleCol", DataTypes.DoubleType, false; Dataset df = dataset.withColumn( "overlaps", functions.array_contains(functions.array(functions.lit(+0.0)), dataset.col("doubleCol"))); List result = df.collectAsList(); // [[-0.0,true]] {code} > Behavior differs between array_overlap and array_contains for negative 0.0 > -- > > Key: SPARK-39885 > URL: https://issues.apache.org/jira/browse/SPARK-39885 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.2 >Reporter: David Vogelbacher >Priority: Major > > {{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], > [-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 > as the same (see > https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28) > However, the {{Double::equals}} method doesn't. Therefore, we should either > mark double as false in > [TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala#L96], > or we should wrap it with our own equals method that handles this case. > Java code snippets showing the issue: > {code:java} > dataset =
[jira] [Updated] (SPARK-39885) Behavior differs between array_overlap and array_contains for negative 0.0
[ https://issues.apache.org/jira/browse/SPARK-39885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-39885: -- Description: {{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], [-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 as the same (see https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28) However, the {{Double::equals}} method doesn't. Therefore, we should either mark double as false in [TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala], or we should wrap it with our own equals method that handles this case. Java code snippets showing the issue: {code:java} dataset = sparkSession.createDataFrame( List.of(RowFactory.create(List.of(-0.0))), DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField( "doubleCol", DataTypes.createArrayType(DataTypes.DoubleType), false; Dataset df = dataset.withColumn( "overlaps", functions.arrays_overlap(functions.array(functions.lit(+0.0)), dataset.col("doubleCol"))); List result = df.collectAsList(); // [[WrappedArray(-0.0),false]] {code} {code:java} dataset = sparkSession.createDataFrame( List.of(RowFactory.create(-0.0)), DataTypes.createStructType( ImmutableList.of(DataTypes.createStructField("doubleCol", DataTypes.DoubleType, false; Dataset df = dataset.withColumn( "overlaps", functions.array_contains(functions.array(functions.lit(+0.0)), dataset.col("doubleCol"))); List result = df.collectAsList(); // [[-0.0,true]] {code} was: {{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], [-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 as the same (see https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28) However, the {{Double::equals}} method doesn't. Therefore, we should either mark double as false in {{TypeUtils#typeWithProperEquals}}, or we should wrap it with our own equals method that handles this case. Java code snippets showing the issue: {code:java} dataset = sparkSession.createDataFrame( List.of(RowFactory.create(List.of(-0.0))), DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField( "doubleCol", DataTypes.createArrayType(DataTypes.DoubleType), false; Dataset df = dataset.withColumn( "overlaps", functions.arrays_overlap(functions.array(functions.lit(+0.0)), dataset.col("doubleCol"))); List result = df.collectAsList(); // [[WrappedArray(-0.0),false]] {code} {code:java} dataset = sparkSession.createDataFrame( List.of(RowFactory.create(-0.0)), DataTypes.createStructType( ImmutableList.of(DataTypes.createStructField("doubleCol", DataTypes.DoubleType, false; Dataset df = dataset.withColumn( "overlaps", functions.array_contains(functions.array(functions.lit(+0.0)), dataset.col("doubleCol"))); List result = df.collectAsList(); // [[-0.0,true]] {code} > Behavior differs between array_overlap and array_contains for negative 0.0 > -- > > Key: SPARK-39885 > URL: https://issues.apache.org/jira/browse/SPARK-39885 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.2 >Reporter: David Vogelbacher >Priority: Major > > {{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], > [-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 > as the same (see > https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28) > However, the {{Double::equals}} method doesn't. Therefore, we should either > mark double as false in > [TypeUtils#typeWithProperEquals|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala], > or we should wrap it with our own equals method that handles this case. > Java code snippets showing the issue: > {code:java} > dataset = sparkSession.createDataFrame( > List.of(RowFactory.create(List.of(-0.0))), > > DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField( > "doubleCol", >
[jira] [Created] (SPARK-39885) Behavior differs between array_overlap and array_contains for negative 0.0
David Vogelbacher created SPARK-39885: - Summary: Behavior differs between array_overlap and array_contains for negative 0.0 Key: SPARK-39885 URL: https://issues.apache.org/jira/browse/SPARK-39885 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.2 Reporter: David Vogelbacher {{array_contains([0.0], -0.0)}} will return true. {{array_overlaps([0.0], [-0.0])}} will return false. I think we generally want to treat -0.0 and 0.0 as the same (see https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SQLOrderingUtil.scala#L28) However, the {{Double::equals}} method doesn't. Therefore, we should either mark double as false in {{TypeUtils#typeWithProperEquals}}, or we should wrap it with our own equals method that handles this case. Java code snippets showing the issue: {code:java} dataset = sparkSession.createDataFrame( List.of(RowFactory.create(List.of(-0.0))), DataTypes.createStructType(ImmutableList.of(DataTypes.createStructField( "doubleCol", DataTypes.createArrayType(DataTypes.DoubleType), false; Dataset df = dataset.withColumn( "overlaps", functions.arrays_overlap(functions.array(functions.lit(+0.0)), dataset.col("doubleCol"))); List result = df.collectAsList(); // [[WrappedArray(-0.0),false]] {code} {code:java} dataset = sparkSession.createDataFrame( List.of(RowFactory.create(-0.0)), DataTypes.createStructType( ImmutableList.of(DataTypes.createStructField("doubleCol", DataTypes.DoubleType, false; Dataset df = dataset.withColumn( "overlaps", functions.array_contains(functions.array(functions.lit(+0.0)), dataset.col("doubleCol"))); List result = df.collectAsList(); // [[-0.0,true]] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39746) Binary array operations can be faster if one side is a constant
David Vogelbacher created SPARK-39746: - Summary: Binary array operations can be faster if one side is a constant Key: SPARK-39746 URL: https://issues.apache.org/jira/browse/SPARK-39746 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.0 Reporter: David Vogelbacher Array operations such as [ArraysOverlap|https://github.com/apache/spark/blob/79f133b7bbc1d9aa6a20dd8a34ec120902f96155/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L1367] are optimized to put all the elements of the smaller array into a HashSet, if elements properly support equals. However, if one of the arrays is a constant, we could do much better as we don't have to reconstruct the HashSet for each row, we could construct it just once and send it to all the executors. This would improve runtime by a constant factor. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28761) spark.driver.maxResultSize only applies to compressed data
[ https://issues.apache.org/jira/browse/SPARK-28761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-28761: -- Description: Spark has a setting {{spark.driver.maxResultSize}}, see https://spark.apache.org/docs/latest/configuration.html#application-properties : {noformat} Limit of total size of serialized results of all partitions for each Spark action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size is above this limit. Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from out-of-memory errors. {noformat} This setting can be very useful in constraining the memory that the spark driver needs for a specific spark action. However, this limit is checked before decompressing data in https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L662 Even if the compressed data is below the limit the uncompressed data can still be far above. In order to protect the driver we should also impose a limit on the uncompressed data. We could do this in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L344 I propose adding a new config option {{spark.driver.maxUncompressedResultSize}}. A simple repro of this with spark shell: {noformat} > printf 'a%.0s' {1..10} > test.csv # create a 100 MB file > ./bin/spark-shell --conf "spark.driver.maxResultSize=1" scala> val df = spark.read.format("csv").load("/Users/dvogelbacher/test.csv") df: org.apache.spark.sql.DataFrame = [_c0: string] scala> val results = df.collect() results: Array[org.apache.spark.sql.Row] = Array([a... scala> results(0).getString(0).size res0: Int = 10 {noformat} Even though we set maxResultSize to 10 MB, we collect a result that is 100MB uncompressed. was: Spark has a setting {{spark.driver.maxResultSize}}, see https://spark.apache.org/docs/latest/configuration.html#application-properties : {noformat} Limit of total size of serialized results of all partitions for each Spark action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size is above this limit. Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from out-of-memory errors. {noformat} This setting can be very useful in constraining the memory that the spark driver needs for a specific spark action. However, this limit is checked before decompressing data in https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L662 Even if the compressed data is below the limit the uncompressed data can still be far above. In order to protect the driver we should also impose a limit on the uncompressed data. We could do this in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L344 I propose adding a new config option {{spark.driver.maxUncompressedResultSize}}. A simple repro of this with spark shell: {noformat} > printf 'a%.0s' {1..10} > test.csv # create a 100 MB file > ./bin/spark-shell --conf "spark.driver.maxResultSize=1" scala> val df = spark.read.format("csv").load("/Users/dvogelbacher/test.csv") df: org.apache.spark.sql.DataFrame = [_c0: string] scala> val results = df.collect() results: Array[org.apache.spark.sql.Row] =
[jira] [Updated] (SPARK-28761) spark.driver.maxResultSize only applies to compressed data
[ https://issues.apache.org/jira/browse/SPARK-28761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-28761: -- Description: Spark has a setting {{spark.driver.maxResultSize}}, see https://spark.apache.org/docs/latest/configuration.html#application-properties : {noformat} Limit of total size of serialized results of all partitions for each Spark action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size is above this limit. Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from out-of-memory errors. {noformat} This setting can be very useful in constraining the memory that the spark driver needs for a specific spark action. However, this limit is checked before decompressing data in https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L662 Even if the compressed data is below the limit the uncompressed data can still be far above. In order to protect the driver we should also impose a limit on the uncompressed data. We could do this in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L344 I propose adding a new config option {{spark.driver.maxUncompressedResultSize}}. A simple repro of this with spark shell: {noformat} > printf 'a%.0s' {1..10} > test.csv # create a 100 MB file > ./bin/spark-shell --conf "spark.driver.maxResultSize=1" scala> val df = spark.read.format("csv").load("/Users/dvogelbacher/test.csv") df: org.apache.spark.sql.DataFrame = [_c0: string] scala> val results = df.collect() results: Array[org.apache.spark.sql.Row] = Array([a... scala> results(0).getString(0).size res0: Int = 10 {noformat} Even though we set maxResultSize to 10 MB, we collect a result that is 100MB uncompressed. was: Spark has a setting `spark.driver.maxResultSize`, see https://spark.apache.org/docs/latest/configuration.html#application-properties : {noformat} Limit of total size of serialized results of all partitions for each Spark action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size is above this limit. Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from out-of-memory errors. {noformat} This setting can be very useful in constraining the memory that the spark driver needs for a specific spark action. However, this limit is checked before decompressing data in https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L662 Even if the compressed data is below the limit the uncompressed data can still be far above. In order to protect the driver we should also impose a limit on the uncompressed data. We could do this in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L344 I propose adding a new config option {{spark.driver.maxUncompressedResultSize}}. A simple repro of this with spark shell: {noformat} > printf 'a%.0s' {1..10} > test.csv # create a 100 MB file > ./bin/spark-shell --conf "spark.driver.maxResultSize=1" scala> val df = spark.read.format("csv").load("/Users/dvogelbacher/test.csv") df: org.apache.spark.sql.DataFrame = [_c0: string] scala> val results = df.collect() results: Array[org.apache.spark.sql.Row] =
[jira] [Created] (SPARK-28761) spark.driver.maxResultSize only applies to compressed data
David Vogelbacher created SPARK-28761: - Summary: spark.driver.maxResultSize only applies to compressed data Key: SPARK-28761 URL: https://issues.apache.org/jira/browse/SPARK-28761 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: David Vogelbacher Spark has a setting `spark.driver.maxResultSize`, see https://spark.apache.org/docs/latest/configuration.html#application-properties : {noformat} Limit of total size of serialized results of all partitions for each Spark action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size is above this limit. Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from out-of-memory errors. {noformat} This setting can be very useful in constraining the memory that the spark driver needs for a specific spark action. However, this limit is checked before decompressing data in https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L662 Even if the compressed data is below the limit the uncompressed data can still be far above. In order to protect the driver we should also impose a limit on the uncompressed data. We could do this in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L344 I propose adding a new config option {{spark.driver.maxUncompressedResultSize}}. A simple repro of this with spark shell: {noformat} > printf 'a%.0s' {1..10} > test.csv # create a 100 MB file > ./bin/spark-shell --conf "spark.driver.maxResultSize=1" scala> val df = spark.read.format("csv").load("/Users/dvogelbacher/test.csv") df: org.apache.spark.sql.DataFrame = [_c0: string] scala> val results = df.collect() results: Array[org.apache.spark.sql.Row] = Array([a... scala> results(0).getString(0).size res0: Int = 10 {noformat} Even though we set maxResultSize to 10 MB, we collect a result that is 100MB uncompressed. -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27805) toPandas does not propagate SparkExceptions with arrow enabled
David Vogelbacher created SPARK-27805: - Summary: toPandas does not propagate SparkExceptions with arrow enabled Key: SPARK-27805 URL: https://issues.apache.org/jira/browse/SPARK-27805 Project: Spark Issue Type: Improvement Components: PySpark, SQL Affects Versions: 3.1.0 Reporter: David Vogelbacher When calling {{toPandas}} with arrow enabled errors encountered during the collect are not propagated to the python process. There is only a very general {{EofError}} raised. Example of behavior with arrow enabled vs. arrow disabled: {noformat} import traceback from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType def raise_exception(): raise Exception("My error") error_udf = udf(raise_exception, IntegerType()) df = spark.range(3).toDF("i").withColumn("x", error_udf()) try: df.toPandas() except: no_arrow_exception = traceback.format_exc() spark.conf.set("spark.sql.execution.arrow.enabled", "true") try: df.toPandas() except: arrow_exception = traceback.format_exc() print no_arrow_exception print arrow_exception {noformat} {{arrow_exception}} gives as output: {noformat} >>> print arrow_exception Traceback (most recent call last): File "", line 2, in File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 2143, in toPandas batches = self._collectAsArrow() File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 2205, in _collectAsArrow results = list(_load_from_socket(sock_info, ArrowCollectSerializer())) File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 210, in load_stream num = read_int(stream) File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 810, in read_int raise EOFError EOFError {noformat} {{no_arrow_exception}} gives as output: {noformat} Traceback (most recent call last): File "", line 2, in File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 2166, in toPandas pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 516, in collect sock_info = self._jdf.collectToPython() File "/Users/dvogelbacher/git/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/Users/dvogelbacher/git/spark/python/pyspark/sql/utils.py", line 89, in deco return f(*a, **kw) File "/Users/dvogelbacher/git/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o38.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/Users/dvogelbacher/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 428, in main process() File "/Users/dvogelbacher/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 423, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 438, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 427, in _batched for item in iterator: File "", line 1, in File "/Users/dvogelbacher/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 86, in return lambda *a: f(*a) File "/Users/dvogelbacher/git/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "", line 2, in raise_exception Exception: My error ... {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27778) toPandas with arrow enabled fails for DF with no partitions
[ https://issues.apache.org/jira/browse/SPARK-27778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-27778: -- Summary: toPandas with arrow enabled fails for DF with no partitions (was: toPandas with arrow enabled fails for DF with no partition) > toPandas with arrow enabled fails for DF with no partitions > --- > > Key: SPARK-27778 > URL: https://issues.apache.org/jira/browse/SPARK-27778 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: David Vogelbacher >Priority: Major > > Calling to pandas with {{spark.sql.execution.arrow.enabled: true}} fails for > dataframes with no partitions. The error is a {{EOFError}}. With > {{spark.sql.execution.arrow.enabled: false}} the conversion. > Repro (on current master branch): > {noformat} > >>> from pyspark.sql.types import * > >>> schema = StructType([StructField("field1", StringType(), True)]) > >>> df = spark.createDataFrame(sc.emptyRDD(), schema) > >>> spark.conf.set("spark.sql.execution.arrow.enabled", "true") > >>> df.toPandas() > /Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py:2162: > UserWarning: toPandas attempted Arrow optimization because > 'spark.sql.execution.arrow.enabled' is set to true, but has reached the error > below and can not continue. Note that > 'spark.sql.execution.arrow.fallback.enabled' does not have an effect on > failures in the middle of computation. > warnings.warn(msg) > Traceback (most recent call last): > File "", line 1, in > File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line > 2143, in toPandas > batches = self._collectAsArrow() > File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line > 2205, in _collectAsArrow > results = list(_load_from_socket(sock_info, ArrowCollectSerializer())) > File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line > 210, in load_stream > num = read_int(stream) > File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line > 810, in read_int > raise EOFError > EOFError > >>> spark.conf.set("spark.sql.execution.arrow.enabled", "false") > >>> df.toPandas() > Empty DataFrame > Columns: [field1] > Index: [] > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27778) toPandas with arrow enabled fails for DF with no partitions
[ https://issues.apache.org/jira/browse/SPARK-27778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843987#comment-16843987 ] David Vogelbacher commented on SPARK-27778: --- I will make a pr for this shortly. > toPandas with arrow enabled fails for DF with no partitions > --- > > Key: SPARK-27778 > URL: https://issues.apache.org/jira/browse/SPARK-27778 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: David Vogelbacher >Priority: Major > > Calling to pandas with {{spark.sql.execution.arrow.enabled: true}} fails for > dataframes with no partitions. The error is a {{EOFError}}. With > {{spark.sql.execution.arrow.enabled: false}} the conversion. > Repro (on current master branch): > {noformat} > >>> from pyspark.sql.types import * > >>> schema = StructType([StructField("field1", StringType(), True)]) > >>> df = spark.createDataFrame(sc.emptyRDD(), schema) > >>> spark.conf.set("spark.sql.execution.arrow.enabled", "true") > >>> df.toPandas() > /Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py:2162: > UserWarning: toPandas attempted Arrow optimization because > 'spark.sql.execution.arrow.enabled' is set to true, but has reached the error > below and can not continue. Note that > 'spark.sql.execution.arrow.fallback.enabled' does not have an effect on > failures in the middle of computation. > warnings.warn(msg) > Traceback (most recent call last): > File "", line 1, in > File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line > 2143, in toPandas > batches = self._collectAsArrow() > File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line > 2205, in _collectAsArrow > results = list(_load_from_socket(sock_info, ArrowCollectSerializer())) > File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line > 210, in load_stream > num = read_int(stream) > File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line > 810, in read_int > raise EOFError > EOFError > >>> spark.conf.set("spark.sql.execution.arrow.enabled", "false") > >>> df.toPandas() > Empty DataFrame > Columns: [field1] > Index: [] > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27778) toPandas with arrow enabled fails for DF with no partition
David Vogelbacher created SPARK-27778: - Summary: toPandas with arrow enabled fails for DF with no partition Key: SPARK-27778 URL: https://issues.apache.org/jira/browse/SPARK-27778 Project: Spark Issue Type: Bug Components: PySpark, SQL Affects Versions: 3.0.0 Reporter: David Vogelbacher Calling to pandas with {{spark.sql.execution.arrow.enabled: true}} fails for dataframes with no partitions. The error is a {{EOFError}}. With {{spark.sql.execution.arrow.enabled: false}} the conversion. Repro (on current master branch): {noformat} >>> from pyspark.sql.types import * >>> schema = StructType([StructField("field1", StringType(), True)]) >>> df = spark.createDataFrame(sc.emptyRDD(), schema) >>> spark.conf.set("spark.sql.execution.arrow.enabled", "true") >>> df.toPandas() /Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py:2162: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.fallback.enabled' does not have an effect on failures in the middle of computation. warnings.warn(msg) Traceback (most recent call last): File "", line 1, in File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 2143, in toPandas batches = self._collectAsArrow() File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 2205, in _collectAsArrow results = list(_load_from_socket(sock_info, ArrowCollectSerializer())) File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 210, in load_stream num = read_int(stream) File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 810, in read_int raise EOFError EOFError >>> spark.conf.set("spark.sql.execution.arrow.enabled", "false") >>> df.toPandas() Empty DataFrame Columns: [field1] Index: [] {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750674#comment-16750674 ] David Vogelbacher commented on SPARK-24437: --- [~DaveDeCaprio] I have not tested it yet but https://issues.apache.org/jira/browse/SPARK-25998 and its associated [PR|https://github.com/apache/spark/pull/22995] might help here. > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26423) [K8s] Make sure that disconnected executors eventually get deleted
[ https://issues.apache.org/jira/browse/SPARK-26423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-26423: -- Description: If an executor disconnects we currently only disable it in the {{KubernetesClusterSchedulerBackend}} but don't take any further action - in the expectation all the other necessary actions (deleting it from spark, requesting a new replacement executor, ...) will be driven by k8s lifecycle events. However, this only works if the reason that the executor disconnected is that the executor pod is dying/shutting down/... It doesn't work if there is just some network issue between driver and executor (but the executor pod is still running in k8s and keeps running). Thus (as indicated in the TODO comment in [KubernetesClusterSchedulerBackend|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala#L158]), we should make sure that a disconnected executor eventually does get killed in k8s. was: If an executor disconnects we currently only disable it in the {{KubernetesClusterSchedulerBackend}} but don't take any further action - in the expectation all the other necessary actions (deleting it from spark, requesting a new replacement executor, ...) will be driven by k8s lifecycle events. However, this only works if the reason that the executor disconnected is that the executor pod is dying/shutting down/... It doesn't work if there is just some network issue between driver and executor (but the executor pod is still running in k8s and keeps running). Thus (as indicated in the TODO comment in https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala#L158), we should make sure that a disconnected executor eventually does get killed in k8s. > [K8s] Make sure that disconnected executors eventually get deleted > -- > > Key: SPARK-26423 > URL: https://issues.apache.org/jira/browse/SPARK-26423 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 2.4.0 >Reporter: David Vogelbacher >Priority: Major > > If an executor disconnects we currently only disable it in the > {{KubernetesClusterSchedulerBackend}} but don't take any further action - in > the expectation all the other necessary actions (deleting it from spark, > requesting a new replacement executor, ...) will be driven by k8s lifecycle > events. > However, this only works if the reason that the executor disconnected is that > the executor pod is dying/shutting down/... > It doesn't work if there is just some network issue between driver and > executor (but the executor pod is still running in k8s and keeps running). > Thus (as indicated in the TODO comment in > [KubernetesClusterSchedulerBackend|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala#L158]), > we should make sure that a disconnected executor eventually does get killed > in k8s. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26423) [K8s] Make sure that disconnected executors eventually get deleted
David Vogelbacher created SPARK-26423: - Summary: [K8s] Make sure that disconnected executors eventually get deleted Key: SPARK-26423 URL: https://issues.apache.org/jira/browse/SPARK-26423 Project: Spark Issue Type: Bug Components: Kubernetes Affects Versions: 2.4.0 Reporter: David Vogelbacher If an executor disconnects we currently only disable it in the {{KubernetesClusterSchedulerBackend}} but don't take any further action - in the expectation all the other necessary actions (deleting it from spark, requesting a new replacement executor, ...) will be driven by k8s lifecycle events. However, this only works if the reason that the executor disconnected is that the executor pod is dying/shutting down/... It doesn't work if there is just some network issue between driver and executor (but the executor pod is still running in k8s and keeps running). Thus (as indicated in the TODO comment in https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala#L158), we should make sure that a disconnected executor eventually does get killed in k8s. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16679768#comment-16679768 ] David Vogelbacher commented on SPARK-24437: --- Thanks for the explanations! I will look into the best workaround for this use-case then. > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16679253#comment-16679253 ] David Vogelbacher commented on SPARK-24437: --- [~eyalfa] There might be hundreds of cached dataframes at the same time (they do get unpersisted after a while, but only when they are very unlikely to be used again). The thing here is that all the dataframes that are cached are generally quite small (~100.000 rows). However, they might be created by a series of joins. So at times the broadcasted data for a specific, cached dataframe is likely bigger than the cached dataframe itself. This might be a bit of an unusual use case. I do know of the workarounds you proposed, but they would significantly harm perf (disabling broadcast joins is not something I want to do for example). In this specific example (where the cached dataframes are smaller than the broadcasted data), it would really be desirable to clean up the broadcasted data and not have it stick around on the driver until the dataframe gets uncached. I still don't quite understand why garbage collecting the broadcasted item would lead to failures when executing the plan later (in case parts of the cached data got evicted), as executing the plan could always just recompute the broadcasted variable? [~mgaido] > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675690#comment-16675690 ] David Vogelbacher commented on SPARK-24437: --- [~eyalfa] the cached relations only really take up space on the executors, as they hold the cached data, whereas the broadcast variable takes up space on the driver (which eventually OOMs/GCs a lot). [~mgaido] Yes, I realize that the broadcast variable is used for re-computation if parts of the cached dataframe get evicted. But couldn't the broadcast variable also get recomputed in this case? Since we do keep track of the whole logical plan when caching a dataset, we should always be able to recompute the broadcast variable when needed. > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671686#comment-16671686 ] David Vogelbacher edited comment on SPARK-24437 at 11/1/18 2:43 PM: Hey [~mgaido], I am seeing something similar in one of our long running applications. The longer it runs the higher heap usage in the driver grows. Taking a heap dump, most of its space is taken up by {{UnsafeHashedRelation}} instances. Looking at their paths to GC, they are strongly referenced by {{InMemoryRelation}} instances: !Screen Shot 2018-11-01 at 10.38.30 AM.png! So it looks like if dataframes are cached then the {{UnsafeHashedRelation}} instances are never cleaned up, because they are strongly references by generated codegen plan? The context cleaner can only clean them up when they are no longer strongly referenced. was (Author: dvogelbacher): Hey [~mgaido], I am seeing something similar in one of our long running applications. The longer it runs the higher heap usage in the driver grows. Taking a heap dump, most of its space is taken up by {{UnsafeHashedRelation}} instances. Looking at their paths to GC, they are strongly referenced by {{InMemoryRelation}} instances: !Screen Shot 2018-11-01 at 10.38.30 AM.png! So it looks like if dataframes are cached the {{UnsafeHashedRelation}}s are never cleaned up, because they are strongly references by generated codegen plan? The context cleaner can only clean them up when they are no longer strongly referenced. > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671686#comment-16671686 ] David Vogelbacher commented on SPARK-24437: --- Hey [~mgaido], I am seeing something similar in one of our long running applications. The longer it runs the higher heap usage in the driver grows. Taking a heap dump, most of its space is taken up by {{UnsafeHashedRelation}} instances. Looking at their paths to GC, they are strongly referenced by {{InMemoryRelation}} instances: !Screen Shot 2018-11-01 at 10.38.30 AM.png! So it looks like if dataframes are cached the {{UnsafeHashedRelation}}s are never cleaned up, because they are strongly references by generated codegen plan? The context cleaner can only clean them up when they are no longer strongly referenced. > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-24437: -- Attachment: Screen Shot 2018-11-01 at 10.38.30 AM.png > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24983) Collapsing multiple project statements with dependent When-Otherwise statements on the same column can OOM the driver
[ https://issues.apache.org/jira/browse/SPARK-24983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-24983: -- Description: I noticed that writing a spark job that includes many sequential {{when-otherwise}} statements on the same column can easily OOM the driver while generating the optimized plan because the project node will grow exponentially in size. Example: {noformat} scala> import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ scala> val df = Seq("a", "b", "c", "1").toDF("text") df: org.apache.spark.sql.DataFrame = [text: string] scala> var dfCaseWhen = df.filter($"text" =!= lit("0")) dfCaseWhen: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [text: string] scala> for( a <- 1 to 5) { | dfCaseWhen = dfCaseWhen.withColumn("text", when($"text" === lit(a.toString), lit("r" + a.toString)).otherwise($"text")) | } scala> dfCaseWhen.queryExecution.analyzed res6: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [CASE WHEN (text#12 = 5) THEN r5 ELSE text#12 END AS text#14] +- Project [CASE WHEN (text#10 = 4) THEN r4 ELSE text#10 END AS text#12] +- Project [CASE WHEN (text#8 = 3) THEN r3 ELSE text#8 END AS text#10] +- Project [CASE WHEN (text#6 = 2) THEN r2 ELSE text#6 END AS text#8] +- Project [CASE WHEN (text#3 = 1) THEN r1 ELSE text#3 END AS text#6] +- Filter NOT (text#3 = 0) +- Project [value#1 AS text#3] +- LocalRelation [value#1] scala> dfCaseWhen.queryExecution.optimizedPlan res5: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END END = 3) THEN r3 ELSE CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END END END = 4) THEN r4 ELSE CASE WHEN (CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END END = 3) THEN r3 ELSE CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END END END END = 5) THEN r5 ELSE CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 ELSE va... {noformat} As one can see the optimized plan grows exponentially in the number of {{when-otherwise}} statements here. I can see that this comes from the {{CollapseProject}} optimizer rule. Maybe we should put a limit on the resulting size of the project node after collapsing and only collapse if we stay under the limit. was: Hi, I noticed that writing a spark job that includes many sequential when-otherwise statements on the same column can easily OOM the driver while generating the optimized plan because the project node will grow exponentially in size. Example: {noformat} scala> import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ scala> val df = Seq("a", "b", "c", "1").toDF("text") df: org.apache.spark.sql.DataFrame = [text: string] scala> var dfCaseWhen = df.filter($"text" =!= lit("0")) dfCaseWhen: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [text: string] scala> for( a <- 1 to 5) { | dfCaseWhen = dfCaseWhen.withColumn("text", when($"text" === lit(a.toString), lit("r" + a.toString)).otherwise($"text")) | } scala> dfCaseWhen.queryExecution.analyzed res6: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [CASE WHEN (text#12 = 5) THEN r5 ELSE text#12 END AS text#14] +- Project [CASE WHEN (text#10 = 4) THEN r4 ELSE text#10 END AS text#12] +- Project [CASE WHEN (text#8 = 3) THEN r3 ELSE text#8 END AS text#10] +- Project [CASE WHEN (text#6 = 2) THEN r2 ELSE text#6 END AS text#8] +- Project [CASE WHEN (text#3 = 1) THEN r1 ELSE text#3 END AS text#6] +- Filter NOT (text#3 = 0) +- Project [value#1 AS text#3] +- LocalRelation [value#1] scala> dfCaseWhen.queryExecution.optimizedPlan res5: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END END = 3) THEN r3 ELSE CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END END END = 4) THEN r4 ELSE CASE WHEN (CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END END = 3) THEN r3 ELSE CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END END END END = 5) THEN r5 ELSE CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (value#1 = 1)
[jira] [Created] (SPARK-24983) Collapsing multiple project statements with dependent When-Otherwise statements on the same column can OOM the driver
David Vogelbacher created SPARK-24983: - Summary: Collapsing multiple project statements with dependent When-Otherwise statements on the same column can OOM the driver Key: SPARK-24983 URL: https://issues.apache.org/jira/browse/SPARK-24983 Project: Spark Issue Type: Bug Components: Optimizer Affects Versions: 2.3.1 Reporter: David Vogelbacher Hi, I noticed that writing a spark job that includes many sequential when-otherwise statements on the same column can easily OOM the driver while generating the optimized plan because the project node will grow exponentially in size. Example: {noformat} scala> import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ scala> val df = Seq("a", "b", "c", "1").toDF("text") df: org.apache.spark.sql.DataFrame = [text: string] scala> var dfCaseWhen = df.filter($"text" =!= lit("0")) dfCaseWhen: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [text: string] scala> for( a <- 1 to 5) { | dfCaseWhen = dfCaseWhen.withColumn("text", when($"text" === lit(a.toString), lit("r" + a.toString)).otherwise($"text")) | } scala> dfCaseWhen.queryExecution.analyzed res6: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [CASE WHEN (text#12 = 5) THEN r5 ELSE text#12 END AS text#14] +- Project [CASE WHEN (text#10 = 4) THEN r4 ELSE text#10 END AS text#12] +- Project [CASE WHEN (text#8 = 3) THEN r3 ELSE text#8 END AS text#10] +- Project [CASE WHEN (text#6 = 2) THEN r2 ELSE text#6 END AS text#8] +- Project [CASE WHEN (text#3 = 1) THEN r1 ELSE text#3 END AS text#6] +- Filter NOT (text#3 = 0) +- Project [value#1 AS text#3] +- LocalRelation [value#1] scala> dfCaseWhen.queryExecution.optimizedPlan res5: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END END = 3) THEN r3 ELSE CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END END END = 4) THEN r4 ELSE CASE WHEN (CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END END = 3) THEN r3 ELSE CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END = 2) THEN r2 ELSE CASE WHEN (value#1 = 1) THEN r1 ELSE value#1 END END END END = 5) THEN r5 ELSE CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (value#1 = 1) THEN r1 ELSE va... {noformat} As one can see the optimized plan grows exponentially in the number of {{when-otherwise}} statements here. I can see that this comes from the {{CollapseProject}} optimizer rule. Maybe we should put a limit on the resulting size of the project node after collapsing and only collapse if we stay under the limit. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24957) Decimal arithmetic can lead to wrong values using codegen
[ https://issues.apache.org/jira/browse/SPARK-24957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561277#comment-16561277 ] David Vogelbacher edited comment on SPARK-24957 at 7/29/18 9:55 PM: [~mgaido] thanks for putting up the PR! I wasn't able to reproduce the incorrectness for the specific example I gave with wholestage codegen disabled: {noformat} scala> spark.conf.set("spark.sql.codegen.wholeStage", false) scala> import org.apache.spark.sql.functions import org.apache.spark.sql.functions scala> val df = Seq( | ("a", BigDecimal("12.0")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("11.88")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("11.88")), | ("a", BigDecimal("11.88")) | ).toDF("text", "number") df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)] scala> val df_grouped_1 = df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number")) df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,22)] scala> df_grouped_1.collect() res1: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143]) scala> val df_grouped_2 = df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number")) df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,22)] scala> df_grouped_2.collect() res2: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143]) scala> val df_total_sum = df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number")) df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)] scala> df_total_sum.collect() res3: Array[org.apache.spark.sql.Row] = Array([11.94857142857143]) {noformat} was (Author: dvogelbacher): [~mgaido] I wasn't able to reproduce the incorrectness for the specific example I gave with wholestage codegen disabled, that's what I meant: {noformat} scala> spark.conf.set("spark.sql.codegen.wholeStage", false) scala> import org.apache.spark.sql.functions import org.apache.spark.sql.functions scala> val df = Seq( | ("a", BigDecimal("12.0")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("11.88")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("11.88")), | ("a", BigDecimal("11.88")) | ).toDF("text", "number") df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)] scala> val df_grouped_1 = df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number")) df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,22)] scala> df_grouped_1.collect() res1: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143]) scala> val df_grouped_2 = df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number")) df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,22)] scala> df_grouped_2.collect() res2: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143]) scala> val df_total_sum = df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number")) df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)] scala> df_total_sum.collect() res3: Array[org.apache.spark.sql.Row] = Array([11.94857142857143]) {noformat} > Decimal arithmetic can lead to wrong values using codegen > - > > Key: SPARK-24957 > URL: https://issues.apache.org/jira/browse/SPARK-24957 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: David Vogelbacher >Priority: Major > > I noticed a bug when doing arithmetic on a dataframe containing decimal > values with codegen enabled. > I tried to narrow it down on a small repro and got this (executed in > spark-shell): > {noformat} > scala> val df = Seq( > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("11.88")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("11.88")), > | ("a", BigDecimal("11.88")) > | ).toDF("text", "number") > df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)] > scala> val df_grouped_1 = > df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number")) > df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: > decimal(38,22)] > scala> df_grouped_1.collect() > res0: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143]) > scala> val df_grouped_2 = >
[jira] [Commented] (SPARK-24957) Decimal arithmetic can lead to wrong values using codegen
[ https://issues.apache.org/jira/browse/SPARK-24957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561277#comment-16561277 ] David Vogelbacher commented on SPARK-24957: --- [~mgaido] I wasn't able to reproduce the incorrectness for the specific example I gave with wholestage codegen disabled, that's what I meant: {noformat} scala> spark.conf.set("spark.sql.codegen.wholeStage", false) scala> import org.apache.spark.sql.functions import org.apache.spark.sql.functions scala> val df = Seq( | ("a", BigDecimal("12.0")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("11.88")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("11.88")), | ("a", BigDecimal("11.88")) | ).toDF("text", "number") df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)] scala> val df_grouped_1 = df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number")) df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,22)] scala> df_grouped_1.collect() res1: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143]) scala> val df_grouped_2 = df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number")) df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,22)] scala> df_grouped_2.collect() res2: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143]) scala> val df_total_sum = df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number")) df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)] scala> df_total_sum.collect() res3: Array[org.apache.spark.sql.Row] = Array([11.94857142857143]) {noformat} > Decimal arithmetic can lead to wrong values using codegen > - > > Key: SPARK-24957 > URL: https://issues.apache.org/jira/browse/SPARK-24957 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: David Vogelbacher >Priority: Major > > I noticed a bug when doing arithmetic on a dataframe containing decimal > values with codegen enabled. > I tried to narrow it down on a small repro and got this (executed in > spark-shell): > {noformat} > scala> val df = Seq( > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("11.88")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("11.88")), > | ("a", BigDecimal("11.88")) > | ).toDF("text", "number") > df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)] > scala> val df_grouped_1 = > df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number")) > df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: > decimal(38,22)] > scala> df_grouped_1.collect() > res0: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143]) > scala> val df_grouped_2 = > df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number")) > df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: > decimal(38,22)] > scala> df_grouped_2.collect() > res1: Array[org.apache.spark.sql.Row] = > Array([a,11948571.4285714285714285714286]) > scala> val df_total_sum = > df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number")) > df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)] > scala> df_total_sum.collect() > res2: Array[org.apache.spark.sql.Row] = Array([11.94857142857143]) > {noformat} > The results of {{df_grouped_1}} and {{df_total_sum}} are correct, whereas the > result of {{df_grouped_2}} is clearly incorrect (it is the value of the > correct result times {{10^14}}). > When codegen is disabled all results are correct. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24957) Decimal arithmetic can lead to wrong values using codegen
[ https://issues.apache.org/jira/browse/SPARK-24957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-24957: -- Description: I noticed a bug when doing arithmetic on a dataframe containing decimal values with codegen enabled. I tried to narrow it down on a small repro and got this (executed in spark-shell): {noformat} scala> val df = Seq( | ("a", BigDecimal("12.0")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("11.88")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("11.88")), | ("a", BigDecimal("11.88")) | ).toDF("text", "number") df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)] scala> val df_grouped_1 = df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number")) df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,22)] scala> df_grouped_1.collect() res0: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143]) scala> val df_grouped_2 = df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number")) df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,22)] scala> df_grouped_2.collect() res1: Array[org.apache.spark.sql.Row] = Array([a,11948571.4285714285714285714286]) scala> val df_total_sum = df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number")) df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)] scala> df_total_sum.collect() res2: Array[org.apache.spark.sql.Row] = Array([11.94857142857143]) {noformat} The results of {{df_grouped_1}} and {{df_total_sum}} are correct, whereas the result of {{df_grouped_2}} is clearly incorrect (it is the value of the correct result times {{10^14}}). When codegen is disabled all results are correct. was: I noticed a bug when doing arithmetic on a dataframe containing decimal values with codegen enabled. I tried to narrow it down on a small repro and got this (executed in spark-shell): {noformat} scala> val df = Seq( | ("a", BigDecimal("12.0")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("11.88")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("11.88")), | ("a", BigDecimal("11.88")) | ).toDF("text", "number") df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)] scala> val df_grouped_1 = df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number")) df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,22)] scala> df_grouped_1.collect() res0: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143]) scala> val df_grouped_2 = df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number")) df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,22)] scala> df_grouped_2.collect() res1: Array[org.apache.spark.sql.Row] = Array([a,11948571.4285714285714285714286]) scala> val df_total_sum = df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number")) df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)] scala> df_total_sum.collect() res2: Array[org.apache.spark.sql.Row] = Array([11.94857142857143]) {noformat} The results of {{df_grouped_1}} and {{df_total_sum}} are correct, whereas the result of {{df_grouped_2}} is clearly incorrect (it is the value of the correct result times {{10^14}}). > Decimal arithmetic can lead to wrong values using codegen > - > > Key: SPARK-24957 > URL: https://issues.apache.org/jira/browse/SPARK-24957 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: David Vogelbacher >Priority: Major > > I noticed a bug when doing arithmetic on a dataframe containing decimal > values with codegen enabled. > I tried to narrow it down on a small repro and got this (executed in > spark-shell): > {noformat} > scala> val df = Seq( > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("11.88")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("11.88")), > | ("a", BigDecimal("11.88")) > | ).toDF("text", "number") > df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)] > scala> val df_grouped_1 = > df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number")) > df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: > decimal(38,22)] > scala> df_grouped_1.collect() > res0:
[jira] [Created] (SPARK-24957) Decimal arithmetic can lead to wrong values using codegen
David Vogelbacher created SPARK-24957: - Summary: Decimal arithmetic can lead to wrong values using codegen Key: SPARK-24957 URL: https://issues.apache.org/jira/browse/SPARK-24957 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.1 Reporter: David Vogelbacher I noticed a bug when doing arithmetic on a dataframe containing decimal values with codegen enabled. I tried to narrow it down on a small repro and got this (executed in spark-shell): {noformat} scala> val df = Seq( | ("a", BigDecimal("12.0")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("11.88")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("12.0")), | ("a", BigDecimal("11.88")), | ("a", BigDecimal("11.88")) | ).toDF("text", "number") df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)] scala> val df_grouped_1 = df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number")) df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,22)] scala> df_grouped_1.collect() res0: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143]) scala> val df_grouped_2 = df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number")) df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,22)] scala> df_grouped_2.collect() res1: Array[org.apache.spark.sql.Row] = Array([a,11948571.4285714285714285714286]) scala> val df_total_sum = df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number")) df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)] scala> df_total_sum.collect() res2: Array[org.apache.spark.sql.Row] = Array([11.94857142857143]) {noformat} The results of {{df_grouped_1}} and {{df_total_sum}} are correct, whereas the result of {{df_grouped_2}} is clearly incorrect (it is the value of the correct result times {{10^14}}). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24934) Should handle missing upper/lower bounds cases in in-memory partition pruning
[ https://issues.apache.org/jira/browse/SPARK-24934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558360#comment-16558360 ] David Vogelbacher commented on SPARK-24934: --- Thanks for opening and making the pr [~hyukjin.kwon]! > Should handle missing upper/lower bounds cases in in-memory partition pruning > - > > Key: SPARK-24934 > URL: https://issues.apache.org/jira/browse/SPARK-24934 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Hyukjin Kwon >Priority: Major > > For example, if array is used (where the lower and upper bounds for its > column batch are {{null}})), it looks wrongly filtering all data out: > {code} > scala> import org.apache.spark.sql.functions > import org.apache.spark.sql.functions > scala> val df = Seq(Array("a", "b"), Array("c", "d")).toDF("arrayCol") > df: org.apache.spark.sql.DataFrame = [arrayCol: array] > scala> > df.filter(df.col("arrayCol").eqNullSafe(functions.array(functions.lit("a"), > functions.lit("b".show() > ++ > |arrayCol| > ++ > | [a, b]| > ++ > scala> > df.cache().filter(df.col("arrayCol").eqNullSafe(functions.array(functions.lit("a"), > functions.lit("b".show() > ++ > |arrayCol| > ++ > ++ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12911) Cacheing a dataframe causes array comparisons to fail (in filter / where) after 1.6
[ https://issues.apache.org/jira/browse/SPARK-12911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556470#comment-16556470 ] David Vogelbacher commented on SPARK-12911: --- Hey [~hyukjin.kwon] [~sdicocco][~a1ray], I just reproduced this on master. I executed the following in the spark-shell: {noformat} scala> import org.apache.spark.sql.functions import org.apache.spark.sql.functions scala> val df = Seq(Array("a", "b"), Array("c", "d")).toDF("arrayCol") df: org.apache.spark.sql.DataFrame = [arrayCol: array] scala> df.filter(df.col("arrayCol").eqNullSafe(functions.array(functions.lit("a"), functions.lit("b".show() ++ |arrayCol| ++ | [a, b]| ++ scala> df.cache().filter(df.col("arrayCol").eqNullSafe(functions.array(functions.lit("a"), functions.lit("b".show() ++ |arrayCol| ++ ++ {noformat} This seems to be the same issue? > Cacheing a dataframe causes array comparisons to fail (in filter / where) > after 1.6 > --- > > Key: SPARK-12911 > URL: https://issues.apache.org/jira/browse/SPARK-12911 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 > Environment: OSX 10.11.1, Scala 2.11.7, Spark 1.6.0 >Reporter: Jesse English >Priority: Major > > When doing a *where* operation on a dataframe and testing for equality on an > array type, after 1.6 no valid comparisons are made if the dataframe has been > cached. If it has not been cached, the results are as expected. > This appears to be related to the underlying unsafe array data types. > {code:title=test.scala|borderStyle=solid} > test("test array comparison") { > val vectors: Vector[Row] = Vector( > Row.fromTuple("id_1" -> Array(0L, 2L)), > Row.fromTuple("id_2" -> Array(0L, 5L)), > Row.fromTuple("id_3" -> Array(0L, 9L)), > Row.fromTuple("id_4" -> Array(1L, 0L)), > Row.fromTuple("id_5" -> Array(1L, 8L)), > Row.fromTuple("id_6" -> Array(2L, 4L)), > Row.fromTuple("id_7" -> Array(5L, 6L)), > Row.fromTuple("id_8" -> Array(6L, 2L)), > Row.fromTuple("id_9" -> Array(7L, 0L)) > ) > val data: RDD[Row] = sc.parallelize(vectors, 3) > val schema = StructType( > StructField("id", StringType, false) :: > StructField("point", DataTypes.createArrayType(LongType, false), > false) :: > Nil > ) > val sqlContext = new SQLContext(sc) > val dataframe = sqlContext.createDataFrame(data, schema) > val targetPoint:Array[Long] = Array(0L,9L) > //Cacheing is the trigger to cause the error (no cacheing causes no error) > dataframe.cache() > //This is the line where it fails > //java.util.NoSuchElementException: next on empty iterator > //However we know that there is a valid match > val targetRow = dataframe.where(dataframe("point") === > array(targetPoint.map(value => lit(value)): _*)).first() > assert(targetRow != null) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23825) [K8s] Spark pods should request memory + memoryOverhead as resources
[ https://issues.apache.org/jira/browse/SPARK-23825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419950#comment-16419950 ] David Vogelbacher commented on SPARK-23825: --- addressed by https://github.com/apache/spark/pull/20943 > [K8s] Spark pods should request memory + memoryOverhead as resources > > > Key: SPARK-23825 > URL: https://issues.apache.org/jira/browse/SPARK-23825 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: David Vogelbacher >Priority: Major > > We currently request {{spark.[driver,executor].memory}} as memory from > Kubernetes (e.g., > [here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]). > The limit is set to {{spark.[driver,executor].memory + > spark.kubernetes.[driver,executor].memoryOverhead}}. > This seems to be using Kubernetes wrong. > [How Pods with resource limits are > run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run], > states: > {noformat} > If a Container exceeds its memory request, it is likely that its Pod will be > evicted whenever the node runs out of memory. > {noformat} > Thus, if a the spark driver/executor uses {{memory + memoryOverhead}} > memory, it can be evicted. While an executor might get restarted (but it > would still be very bad performance-wise), the driver would be hard to > recover. > I think spark should be able to run with the requested (and, thus, > guaranteed) resources from Kubernetes without being in danger of termination > without needing to rely on optional available resources. > Thus, we shoud request {{memory + memoryOverhead}} memory from Kubernetes > (and this should also be the limit). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23825) [K8s] Spark pods should request memory + memoryOverhead as resources
[ https://issues.apache.org/jira/browse/SPARK-23825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419857#comment-16419857 ] David Vogelbacher commented on SPARK-23825: --- Will make a PR shortly, cc [~mcheah] > [K8s] Spark pods should request memory + memoryOverhead as resources > > > Key: SPARK-23825 > URL: https://issues.apache.org/jira/browse/SPARK-23825 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: David Vogelbacher >Priority: Major > > We currently request {{spark.[driver,executor].memory}} as memory from > Kubernetes (e.g., > [here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]). > The limit is set to {{spark.[driver,executor].memory + > spark.kubernetes.[driver,executor].memoryOverhead}}. > This seems to be using Kubernetes wrong. > [How Pods with resource limits are > run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run], > states: > {noformat} > If a Container exceeds its memory request, it is likely that its Pod will be > evicted whenever the node runs out of memory. > {noformat} > Thus, if a the spark driver/executor uses {{memory + memoryOverhead}} > memory, it can be evicted. While an executor might get restarted (but it > would still be very bad performance-wise), the driver would be hard to > recover. > I think spark should be able to run with the requested (and, thus, > guaranteed) resources from Kubernetes without being in danger of termination > without needing to rely on optional available resources. > Thus, we shoud request {{memory + memoryOverhead}} memory from Kubernetes > (and this should also be the limit). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23825) [K8s] Spark pods should request memory + memoryOverhead as resources
[ https://issues.apache.org/jira/browse/SPARK-23825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-23825: -- Description: We currently request {{spark.[driver,executor].memory}} as memory from Kubernetes (e.g., [here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]). The limit is set to {{spark.[driver,executor].memory + spark.kubernetes.[driver,executor].memoryOverhead}}. This seems to be using Kubernetes wrong. [How Pods with resource limits are run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run], states: {noformat} If a Container exceeds its memory request, it is likely that its Pod will be evicted whenever the node runs out of memory. {noformat} Thus, if a the spark driver/executor uses {{memory + memoryOverhead}} memory, it can be evicted. While an executor might get restarted (but it would still be very bad performance-wise), the driver would be hard to recover. I think spark should be able to run with the requested (and, thus, guaranteed) resources from Kubernetes without being in danger of termination without needing to rely on optional available resources. Thus, we shoud request {{memory + memoryOverhead}} memory from Kubernetes (and this should also be the limit). was: We currently request {{spark.{driver,executor}.memory}} as memory from Kubernetes (e.g., [here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]). The limit is set to {{spark.{driver,executor}.memory + spark.kubernetes.{driver,executor}.memoryOverhead}}. This seems to be using Kubernetes wrong. [How Pods with resource limits are run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run], states" {noformat} If a Container exceeds its memory request, it is likely that its Pod will be evicted whenever the node runs out of memory. {noformat} Thus, if a the spark driver/executor uses {{memory + memoryOverhead}} memory, it can be evicted. While an executor might get restarted (but it would still be very bad performance-wise), the driver would be hard to recover. I think spark should be able to run with the requested (and, thus, guaranteed) resources from Kubernetes. It shouldn't rely on optional resources above the request and, therefore, be in danger of termination on high cluster utilization. Thus, we shoud request {{memory + memoryOverhead}} memory from Kubernetes (and this should also be the limit). > [K8s] Spark pods should request memory + memoryOverhead as resources > > > Key: SPARK-23825 > URL: https://issues.apache.org/jira/browse/SPARK-23825 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: David Vogelbacher >Priority: Major > > We currently request {{spark.[driver,executor].memory}} as memory from > Kubernetes (e.g., > [here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]). > The limit is set to {{spark.[driver,executor].memory + > spark.kubernetes.[driver,executor].memoryOverhead}}. > This seems to be using Kubernetes wrong. > [How Pods with resource limits are > run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run], > states: > {noformat} > If a Container exceeds its memory request, it is likely that its Pod will be > evicted whenever the node runs out of memory. > {noformat} > Thus, if a the spark driver/executor uses {{memory + memoryOverhead}} > memory, it can be evicted. While an executor might get restarted (but it > would still be very bad performance-wise), the driver would be hard to > recover. > I think spark should be able to run with the requested (and, thus, > guaranteed) resources from Kubernetes without being in danger of termination > without needing to rely on optional available resources. > Thus, we shoud request {{memory + memoryOverhead}} memory from Kubernetes > (and this should also be the limit). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23825) [K8s] Spark pods should request memory + memoryOverhead as resources
[ https://issues.apache.org/jira/browse/SPARK-23825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-23825: -- Description: We currently request {{spark.{driver,executor}.memory}} as memory from Kubernetes (e.g., [here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]). The limit is set to {{spark.{driver,executor}.memory + spark.kubernetes.{driver,executor}.memoryOverhead}}. This seems to be using Kubernetes wrong. [How Pods with resource limits are run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run], states" {noformat} If a Container exceeds its memory request, it is likely that its Pod will be evicted whenever the node runs out of memory. {noformat} Thus, if a the spark driver/executor uses {{memory + memoryOverhead}} memory, it can be evicted. While an executor might get restarted (but it would still be very bad performance-wise), the driver would be hard to recover. I think spark should be able to run with the requested (and, thus, guaranteed) resources from Kubernetes. It shouldn't rely on optional resources above the request and, therefore, be in danger of termination on high cluster utilization. Thus, we shoud request {{memory + memoryOverhead}} memory from Kubernetes (and this should also be the limit). was: We currently request `spark.{driver,executor}.memory` as memory from Kubernetes (e.g., [here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]). The limit is set to `spark.{driver,executor}.memory + spark.kubernetes.{driver,executor}.memoryOverhead`. This seems to be using Kubernetes wrong. [How Pods with resource limits are run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run], states" {noformat} If a Container exceeds its memory request, it is likely that its Pod will be evicted whenever the node runs out of memory. {noformat} Thus, if a the spark driver/executor uses `memory + memoryOverhead` memory, it can be evicted. While an executor might get restarted (but it would still be very bad performance-wise), the driver would be hard to recover. I think spark should be able to run with the requested (and, thus, guaranteed) resources from Kubernetes. It shouldn't rely on optional resources above the request and, therefore, be in danger of termination on high cluster utilization. Thus, we shoud request `memory + memoryOverhead` memory from Kubernetes (and this should also be the limit). > [K8s] Spark pods should request memory + memoryOverhead as resources > > > Key: SPARK-23825 > URL: https://issues.apache.org/jira/browse/SPARK-23825 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: David Vogelbacher >Priority: Major > > We currently request {{spark.{driver,executor}.memory}} as memory from > Kubernetes (e.g., > [here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]). > The limit is set to {{spark.{driver,executor}.memory + > spark.kubernetes.{driver,executor}.memoryOverhead}}. > This seems to be using Kubernetes wrong. > [How Pods with resource limits are > run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run], > states" > {noformat} > If a Container exceeds its memory request, it is likely that its Pod will be > evicted whenever the node runs out of memory. > {noformat} > Thus, if a the spark driver/executor uses {{memory + memoryOverhead}} > memory, it can be evicted. While an executor might get restarted (but it > would still be very bad performance-wise), the driver would be hard to > recover. > I think spark should be able to run with the requested (and, thus, > guaranteed) resources from Kubernetes. It shouldn't rely on optional > resources above the request and, therefore, be in danger of termination on > high cluster utilization. > Thus, we shoud request {{memory + memoryOverhead}} memory from Kubernetes > (and this should also be the limit). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23825) [K8s] Spark pods should request memory + memoryOverhead as resources
David Vogelbacher created SPARK-23825: - Summary: [K8s] Spark pods should request memory + memoryOverhead as resources Key: SPARK-23825 URL: https://issues.apache.org/jira/browse/SPARK-23825 Project: Spark Issue Type: Bug Components: Kubernetes Affects Versions: 2.3.0 Reporter: David Vogelbacher We currently request `spark.{driver,executor}.memory` as memory from Kubernetes (e.g., [here|https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala#L95]). The limit is set to `spark.{driver,executor}.memory + spark.kubernetes.{driver,executor}.memoryOverhead`. This seems to be using Kubernetes wrong. [How Pods with resource limits are run|https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#how-pods-with-resource-limits-are-run], states" {noformat} If a Container exceeds its memory request, it is likely that its Pod will be evicted whenever the node runs out of memory. {noformat} Thus, if a the spark driver/executor uses `memory + memoryOverhead` memory, it can be evicted. While an executor might get restarted (but it would still be very bad performance-wise), the driver would be hard to recover. I think spark should be able to run with the requested (and, thus, guaranteed) resources from Kubernetes. It shouldn't rely on optional resources above the request and, therefore, be in danger of termination on high cluster utilization. Thus, we shoud request `memory + memoryOverhead` memory from Kubernetes (and this should also be the limit). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391709#comment-16391709 ] David Vogelbacher edited comment on SPARK-23598 at 3/8/18 6:41 PM: --- [~mgaido] {{HashAggregateExec}} calls {{addNewFunction}}, which calls {{addNewFunctionInternal}} which uses that flag and checks if the current size is bigger than {{GENERATED_CLASS_SIZE_THRESHOLD}} ([see|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L478]) I just compiled develop with {{GENERATED_CLASS_SIZE_THRESHOLD}} set to -1 and was able to reproduce (cc [~hvanhovell]) . I applied the following diff before compiling: {noformat} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 793824b0b0..7fad817d89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1167,7 +1167,7 @@ object CodeGenerator extends Logging { // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a // threshold of 1000k bytes to determine when a function should be inlined to a private, inner // class. - final val GENERATED_CLASS_SIZE_THRESHOLD = 100 + final val GENERATED_CLASS_SIZE_THRESHOLD = -1 // This is the threshold for the number of global variables, whose types are primitive type or // complex type (e.g. more than one-dimensional array), that will be placed at the outer class (END) {noformat} Then, I executed a simple groupBy-Aggregate in the spark-shell and got the same error: {noformat} ➜ spark git:(master) ✗ ./bin/spark-shell 18/03/08 18:30:24 WARN Utils: Your hostname, dvogelbac resolves to a loopback address: 127.0.0.1; using 10.111.11.111 instead (on interface en0) 18/03/08 18:30:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 18/03/08 18:30:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://10.111.11.111:4040 Spark context available as 'sc' (master = local[*], app id = local-1520533829643). Spark session available as 'spark'. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.0-SNAPSHOT /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121) Type in expressions to have them evaluated. Type :help for more information. scala> spark.conf.set("spark.sql.codegen.wholeStage", true) scala> val df_pet_age = Seq( | (8, "bat"), | (5, "bat"), | (15, "bat"), | (30, "mouse"), | (15, "mouse"), | (23, "mouse"), | (8, "horse"), | (-5, "horse") | ).toDF("age", "name") df_pet_age: org.apache.spark.sql.DataFrame = [age: int, name: string] scala> df_pet_age.groupBy("name").avg("age").show() 18/03/08 18:31:20 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.stopEarly()Z from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:616) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
[jira] [Commented] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391709#comment-16391709 ] David Vogelbacher commented on SPARK-23598: --- [~mgaido] {{HashAggregateExec}} calls {{addNewFunction}}, which calls {{addNewFunctionInternal}} which uses that flag and checks if the current size is bigger than {{GENERATED_CLASS_SIZE_THRESHOLD}} ([see|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L478]) I just compiled develop with {{GENERATED_CLASS_SIZE_THRESHOLD}} set to -1 and was able to reproduce (cc [~hvanhovell]) . I applied the following diff before compiling: {noformat} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 793824b0b0..7fad817d89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1167,7 +1167,7 @@ object CodeGenerator extends Logging { // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a // threshold of 1000k bytes to determine when a function should be inlined to a private, inner // class. - final val GENERATED_CLASS_SIZE_THRESHOLD = 100 + final val GENERATED_CLASS_SIZE_THRESHOLD = -1 // This is the threshold for the number of global variables, whose types are primitive type or // complex type (e.g. more than one-dimensional array), that will be placed at the outer class (END) {noformat} Then, I executed a simple groupBy-Aggregate in the spark-shell and got the same error: {noformat} ➜ spark git:(master) ✗ ./bin/spark-shell 18/03/08 18:30:24 WARN Utils: Your hostname, dvogelbac56-mac resolves to a loopback address: 127.0.0.1; using 10.224.86.161 instead (on interface en0) 18/03/08 18:30:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 18/03/08 18:30:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://10.224.86.161:4040 Spark context available as 'sc' (master = local[*], app id = local-1520533829643). Spark session available as 'spark'. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.0-SNAPSHOT /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121) Type in expressions to have them evaluated. Type :help for more information. scala> spark.conf.set("spark.sql.codegen.wholeStage", true) scala> val df_pet_age = Seq( | (8, "bat"), | (5, "bat"), | (15, "bat"), | (30, "mouse"), | (15, "mouse"), | (23, "mouse"), | (8, "horse"), | (-5, "horse") | ).toDF("age", "name") df_pet_age: org.apache.spark.sql.DataFrame = [age: int, name: string] scala> df_pet_age.groupBy("name").avg("age").show() 18/03/08 18:31:20 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.stopEarly()Z from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:616) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
[jira] [Commented] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389745#comment-16389745 ] David Vogelbacher commented on SPARK-23598: --- [~hvanhovell] unfortunately, I can't extract any code for reproducing. It should be possible to come up with code, by making a large enough query (one that adds many methods in the code gen stage) and contains a HashAggregateExec node. Or to make it even easier, one could compile spark with a lowered `final val GENERATED_CLASS_SIZE_THRESHOLD` in [CodeGenerator title|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L1170], to force the generation of the private inner class. > WholeStageCodegen can lead to IllegalAccessError calling append for > HashAggregateExec > -- > > Key: SPARK-23598 > URL: https://issues.apache.org/jira/browse/SPARK-23598 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: David Vogelbacher >Priority: Major > > Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: > {noformat} > java.lang.IllegalAccessError: tried to access method > org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V > from class > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat} > After disabling codegen, everything works. > The root cause seems to be that we are trying to call the protected _append_ > method of > [BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68] > from an inner-class of a sub-class that is loaded by a different > class-loader (after codegen compilation). > [https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] > states that a protected method _R_ can be accessed only if one of the > following two conditions is fulfilled: > # R is protected and is declared in a class C, and D is either a subclass of > C or C itself. Furthermore, if R is not static, then the symbolic reference > to R must contain a symbolic reference to a class T, such that T is either a > subclass of D, a superclass of D, or D itself. > # R is either protected or has default access (that is, neither public nor > protected nor private), and is declared by a class in the same run-time > package as D. > 2.) doesn't apply as we have loaded the class with a different class loader > (and are in a different package) and 1.) doesn't apply because we are > apparently trying to call the method from an inner class of a subclass of > _BufferedRowIterator_. > Looking at the Code path of _WholeStageCodeGen_, the following happens: > # In > [WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527], > we create the subclass of _BufferedRowIterator_, along with a _processNext_ > method for processing the output of the child plan. > # In the child, which is a > [HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517], > we create the method which shows up at the top of the stack trace (called > _doAggregateWithKeysOutput_ ) > # We add this method to the compiled code invoking _addNewFunction_ of > [CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460] > In the generated function body we call the _append_ method.| > Now, the _addNewFunction_
[jira] [Updated] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-23598: -- Description: Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: {noformat} java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat} After disabling codegen, everything works. The root cause seems to be that we are trying to call the protected _append_ method of [BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68] from an inner-class of a sub-class that is loaded by a different class-loader (after codegen compilation). [https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] states that a protected method _R_ can be accessed only if one of the following two conditions is fulfilled: # R is protected and is declared in a class C, and D is either a subclass of C or C itself. Furthermore, if R is not static, then the symbolic reference to R must contain a symbolic reference to a class T, such that T is either a subclass of D, a superclass of D, or D itself. # R is either protected or has default access (that is, neither public nor protected nor private), and is declared by a class in the same run-time package as D. 2.) doesn't apply as we have loaded the class with a different class loader (and are in a different package) and 1.) doesn't apply because we are apparently trying to call the method from an inner class of a subclass of _BufferedRowIterator_. Looking at the Code path of _WholeStageCodeGen_, the following happens: # In [WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527], we create the subclass of _BufferedRowIterator_, along with a _processNext_ method for processing the output of the child plan. # In the child, which is a [HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517], we create the method which shows up at the top of the stack trace (called _doAggregateWithKeysOutput_ ) # We add this method to the compiled code invoking _addNewFunction_ of [CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460] In the generated function body we call the _append_ method.| Now, the _addNewFunction_ method states that: {noformat} If the code for the `OuterClass` grows too large, the function will be inlined into a new private, inner class {noformat} This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put into a new private inner class. Thus, it doesn't have access to the protected _append_ method anymore but still tries to call it, which results in the _IllegalAccessError._ Possible fixes: * Pass in the _inlineToOuterClass_ flag when invoking the _addNewFunction_ * Make the _append_ method public * Re-declare the _append_ method in the generated subclass (just invoking _super_). This way, inner classes should have access to it. was: Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: {noformat} java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass at
[jira] [Updated] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-23598: -- Description: Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: {noformat} java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat} After disabling codegen, everything works. The root cause seems to be that we are trying to call the protected _append_ method of [BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68] from an inner-class of a sub-class that is loaded by a different class-loader (after codegen compilation). [https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] states that a protected method _R_ can be accessed only if one of the following two conditions is fulfilled: # R is protected and is declared in a class C, and D is either a subclass of C or C itself. Furthermore, if R is not static, then the symbolic reference to R must contain a symbolic reference to a class T, such that T is either a subclass of D, a superclass of D, or D itself. # R is either protected or has default access (that is, neither public nor protected nor private), and is declared by a class in the same run-time package as D. 2.) doesn't apply as we have loaded the class with a different class loader (and are in a different package) and 1.) doesn't apply because we are apparently trying to call the method from an inner class of a subclass of _BufferedRowIterator_. Looking at the Code path of _WholeStageCodeGen_, the following happens: # In [WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527], we create the subclass of _BufferedRowIterator_, along with a _processNext_ method for processing the output of the child plan. # In the child, which is a [HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517], we create the method which shows up at the top of the stack trace (called _doAggregateWithKeysOutput_ ) # We add this method to the compiled code invoking _addNewFunction_ of [CodeGenerator |https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460]In the generated function body we call the _append_ method. Now, the _addNewFunction_ method states that: {noformat} If the code for the `OuterClass` grows too large, the function will be inlined into a new private, inner class {noformat} This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put into a new private inner class. Thus, it doesn't have access to the protected _append_ method anymore but still tries to call it, which results in the _IllegalAccessError._ Possible fixes: * Pass in the _inlineToOuterClass_ flag when invoking the _addNewFunction_ * Make the _append_ method public * Re-declare the _append_ method in the generated subclass (just invoking _super_). This way, inner classes should have access to it. was: Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: {noformat} java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass at
[jira] [Updated] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-23598: -- Description: Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: {noformat} java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat} After disabling codegen, everything works. The root cause seems to be that we are trying to call the protected _append_ method of [BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68] from an inner-class of a sub-class that is loaded by a different class-loader (after codegen compilation). [https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] states that a protected method _R_ can be accessed only if one of the following two conditions is fulfilled: # R is protected and is declared in a class C, and D is either a subclass of C or C itself. Furthermore, if R is not static, then the symbolic reference to R must contain a symbolic reference to a class T, such that T is either a subclass of D, a superclass of D, or D itself. # R is either protected or has default access (that is, neither public nor protected nor private), and is declared by a class in the same run-time package as D. 2.) doesn't apply as we have loaded the class with a different class loader (and are in a different package) and 1.) doesn't apply because we are apparently trying to call the method from an inner class of a subclass of _BufferedRowIterator_. Looking at the Code path of _WholeStageCodeGen_, the following happens: # In [WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527], we create the subclass of _BufferedRowIterator_, along with a _processNext_ method for processing the output of the child plan. # In the child, which is a [HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517], we create the method which shows up at the top of the stack trace (called _doAggregateWithKeysOutput_ ) # We add this method to the compiled code invoking _addNewFunction_ of [CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460]In the generated function body we call the _append_ method.| Now, the _addNewFunction_ method states that: {noformat} If the code for the `OuterClass` grows too large, the function will be inlined into a new private, inner class {noformat} This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put into a new private inner class. Thus, it doesn't have access to the protected _append_ method anymore but still tries to call it, which results in the _IllegalAccessError._ Possible fixes: * Pass in the _inlineToOuterClass_ flag when invoking the _addNewFunction_ * Make the _append_ method public * Re-declare the _append_ method in the generated subclass (just invoking _super_). This way, inner classes should have access to it. was: Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: {noformat} java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass at
[jira] [Updated] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-23598: -- Description: Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: {noformat} java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat} After disabling codegen, everything works. The root cause seems to be that we are trying to call the protected _append_ method of [BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68] from an inner-class of a sub-class that is loaded by a different class-loader (after codegen compilation). [https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] states that a protected method _R_ can be accessed only if one of the following two conditions is fulfilled: # R is protected and is declared in a class C, and D is either a subclass of C or C itself. Furthermore, if R is not static, then the symbolic reference to R must contain a symbolic reference to a class T, such that T is either a subclass of D, a superclass of D, or D itself. # R is either protected or has default access (that is, neither public nor protected nor private), and is declared by a class in the same run-time package as D. 2.) doesn't apply as we have loaded the class with a different class loader (and are in a different package) and 1.) doesn't apply because we are apparently trying to call the method from an inner class of a subclass of _BufferedRowIterator_. Looking at the Code path of _WholeStageCodeGen_, the following happens: # In [WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527], we create the subclass of _BufferedRowIterator_, along with a _processNext_ method for processing the output of the child plan. # In the child, which is a [HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517], we create the method which shows up at the top of the stack trace (called _doAggregateWithKeysOutput_ ) # We add this method to the compiled code invoking _addNewFunction_ of [CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460] Now, this method states that: {noformat} If the code for the `OuterClass` grows too large, the function will be inlined into a new private, inner class {noformat} This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put into a new private inner class. Thus, it doesn't have access to the protected _append_ method anymore but still tries to call it, which results in the _IllegalAccessError._ Possible fixes: * Pass in the _inlineToOuterClass_ flag when invoking the _addNewFunction_ * Make the _append_ method public * Re-declare the _append_ method in the generated subclass (just invoking _super_). This way, inner classes should have access to it. was: Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: {noformat} java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown Source) at
[jira] [Updated] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-23598: -- Description: Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: {noformat} java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat} After disabling codegen, everything works. The root cause seems to be that we are trying to call the protected _append_ method of [BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68] from an inner-class of a sub-class that is loaded by a different class-loader (after codegen compilation). [https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] states that a protected method _R_ can be accessed only if one of the following two conditions is fulfilled: # R is protected and is declared in a class C, and D is either a subclass of C or C itself. Furthermore, if R is not static, then the symbolic reference to R must contain a symbolic reference to a class T, such that T is either a subclass of D, a superclass of D, or D itself. # R is either protected or has default access (that is, neither public nor protected nor private), and is declared by a class in the same run-time package as D. 2.) doesn't apply as we have loaded the class with a different class loader (and are in a different package) and 1.) doesn't apply because we are apparently trying to call the method from an inner class of a subclass of _BufferedRowIterator_. Looking at the Code path of _WholeStageCodeGen_, the following happens: # In [WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527], we create the subclass of _BufferedRowIterator_, along with a _processNext_ method for processing the output of the child plan. # In the child, which is a [HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517], we create the method which shows up at the top of the stack trace (called _doAggregateWithKeysOutput_ ) # We add this method to the compiled code invoking _addNewFunction_ of [CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460] Now, this method states that: {noformat} If the code for the `OuterClass` grows too large, the function will be inlined into a new private, inner class {noformat} This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put into a new private inner class. Thus, it doesn't have access to the protected _append_ method anymore but still tries to call it, which results in the ___IllegalAccessError._ Possible fixes: * Pass in the _inlineToOuterClass_ flag when invoking __ _addNewFunction_ * Make the _append_ method public * Re-declare the _append_ method in the generated subclass (just invoking _super_). This way, inner classes should have access to it. was: Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: {noformat} java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown Source) at
[jira] [Updated] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Vogelbacher updated SPARK-23598: -- Description: Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: {noformat} java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat} After disabling codegen, everything works. The root cause seems to be that we are trying to call the protected _append_ method of [BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68] __ from an inner-class of a sub-class that is loaded by a different class-loader (after codegen compilation). [https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] states that a protected method _R_ can be accessed only if one of the following two conditions is fulfilled: # R is protected and is declared in a class C, and D is either a subclass of C or C itself. Furthermore, if R is not static, then the symbolic reference to R must contain a symbolic reference to a class T, such that T is either a subclass of D, a superclass of D, or D itself. # R is either protected or has default access (that is, neither public nor protected nor private), and is declared by a class in the same run-time package as D. 2.) doesn't apply as we have loaded the class with a different class loader (and are in a different package) and 1.) doesn't apply because we are apparently trying to call the method from an inner class of a subclass of _BufferedRowIterator_. Looking at the Code path of _WholeStageCodeGen_, the following happens: # In [WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527], we create the subclass of _BufferedRowIterator_, along with a _processNext_ method for processing the output of the child plan. # In the child, which is a [HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517], we create the method which shows up at the top of the stack trace (called _doAggregateWithKeysOutput_ ) # We add this method to the compiled code invoking _addNewFunction_ of [CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460] Now, this method states that: {noformat} If the code for the `OuterClass` grows too large, the function will be inlined into a new private, inner class {noformat} This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put into a new private inner class. Thus, it doesn't have access to the protected _append_ method anymore but still tries to call it, which results in the ___IllegalAccessError._ Possible fixes: * Pass in the _inlineToOuterClass_ flag when invoking __ _addNewFunction_ * Make the _append_ method public * Re-declare the _append_ method in the generated subclass (just invoking _super_). This way, inner classes should have access to it. was: Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: {code:java} java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown Source) at
[jira] [Created] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
David Vogelbacher created SPARK-23598: - Summary: WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec Key: SPARK-23598 URL: https://issues.apache.org/jira/browse/SPARK-23598 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.0 Reporter: David Vogelbacher Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: {code:java} java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) {code} After disabling codegen, everything works. The root cause seems to be that we are trying to call the protected _append_ method of [BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68] __ from an inner-class of a sub-class that is loaded by a different class-loader (after codegen compilation). [https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] states that a protected method _R_ can be accessed only if one of the following two conditions is fulfilled: # R is protected and is declared in a class C, and D is either a subclass of C or C itself. Furthermore, if R is not static, then the symbolic reference to R must contain a symbolic reference to a class T, such that T is either a subclass of D, a superclass of D, or D itself. # R is either protected or has default access (that is, neither public nor protected nor private), and is declared by a class in the same run-time package as D. 2.) doesn't apply as we have loaded the class with a different class loader (and are in a different package) and 1.) doesn't apply because we are apparently trying to call the method from an inner class of a subclass of _BufferedRowIterator_. Looking at the Code path of _WholeStageCodeGen_, the following happens: # In [WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527], we create the subclass of _BufferedRowIterator_, along with a _processNext_ method for processing the output of the child plan. # In the child, which is a [HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517], we create the method which shows up at the top of the stack trace (called _doAggregateWithKeysOutput_ ) # We add this method to the compiled code invoking _addNewFunction_ of [CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460] Now, this method states that: {noformat} If the code for the `OuterClass` grows too large, the function will be inlined into a new private, inner class {noformat} This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put into a new private inner class. Thus, it doesn't have access to the protected _append_ method anymore but still tries to call it, which results in the ___IllegalAccessError._ Possible fixes: * Pass in the _inlineToOuterClass_ flag when invoking __ _addNewFunction_ * Make the _append_ method public * Re-declare the _append_ method in the generated subclass (just invoking _super_). This way, inner classes should have access to it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org