[jira] [Updated] (SPARK-35511) Spark computes all rows during count() on a parquet file
[ https://issues.apache.org/jira/browse/SPARK-35511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Tsukanov updated SPARK-35511: -- Description: We expect spark uses parquet metadata to fetch the rows count of a parquet file. But when we execute the following code {code:java} import org.apache.spark.SparkConf import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} object Test extends App { val sparkConf = new SparkConf() .setAppName("test-app") .setMaster("local[1]") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() import sparkSession.implicits._ val filePath = "./tempFile.parquet" (1 to 1000).toDF("c1") .repartition(10) .write .mode("overwrite") .parquet(filePath) val df = sparkSession.read.parquet(filePath) var rowsInHeavyComputation = 0 def heavyComputation(row: Row): Row = { rowsInHeavyComputation += 1 println(s"rowsInHeavyComputation = $rowsInHeavyComputation") Thread.sleep(50) row } implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) val cnt = df .map(row => heavyComputation(row)) // map operation cannot change number of rows .count() println(s"counting done, cnt=$cnt") } {code} we see {code:java} rowsInHeavyComputation = 1 rowsInHeavyComputation = 2 ... rowsInHeavyComputation = 999 rowsInHeavyComputation = 1000 counting done, cnt=1000 {code} *Expected result* - spark does not perform heavyComputation at all. P.S. In our real application we: - transform data from parquet files - return some examples (50 rows and spark does heavyComputation only for 50 rows) - return rows count of the whole DataFrame and here spark for some reason computes the whole DataFrame despite the fact there are only map operations and initial rows count can be gotten from parquet meta was: We expect spark uses parquet metadata to fetch the rows count of a parquet file. But when we execute the following code {code:java} import org.apache.spark.SparkConf import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} object Test extends App { val sparkConf = new SparkConf() .setAppName("test-app") .setMaster("local[1]") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() import sparkSession.implicits._ val filePath = "./tempFile.parquet" (1 to 1000).toDF("c1") .repartition(10) .write .mode("overwrite") .parquet(filePath) val df = sparkSession.read.parquet(filePath) var rowsInHeavyComputation = 0 def heavyComputation(row: Row): Row = { rowsInHeavyComputation += 1 println(s"rowsInHeavyComputation = $rowsInHeavyComputation") Thread.sleep(50) row } implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) val cnt = df .map(row => heavyComputation(row)) // map operation cannot change number of rows .count() println(s"counting done, cnt=$cnt") } {code} we see {code:java} rowsInHeavyComputation = 1 rowsInHeavyComputation = 2 ... rowsInHeavyComputation = 999 rowsInHeavyComputation = 1000 counting done, cnt=1000 {code} Expected result - spark does not perform heavyComputation at all. P.S. In our real application we - transform data from parquet files - return some examples (50 rows and spark does heavyComputation only for 50 rows) - return rows count of the whole DataFrame and here spark for some reason computes the whole DataFrame despite the fact there are only map operations and initial rows count can be gotten from parquet meta > Spark computes all rows during count() on a parquet file > > > Key: SPARK-35511 > URL: https://issues.apache.org/jira/browse/SPARK-35511 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Ivan Tsukanov >Priority: Major > > We expect spark uses parquet metadata to fetch the rows count of a parquet > file. But when we execute the following code > {code:java} > import org.apache.spark.SparkConf > import org.apache.spark.sql.{Row, SparkSession} > import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} > object Test extends App { > val sparkConf = new SparkConf() > .setAppName("test-app") > .setMaster("local[1]") > val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() > import sparkSession.implicits._ > val filePath = "./tempFile.parquet" > (1 to 1000).toDF("c1") > .repartition(10) > .write > .mode("overwrite") > .parquet(filePath) > val df = sparkSession.read.parquet(filePath) > var rowsInHeavyComputation = 0 > def heavyComputation(row: Row): Row = { > row
[jira] [Created] (SPARK-35511) Spark computes all rows during count() on a parquet file
Ivan Tsukanov created SPARK-35511: - Summary: Spark computes all rows during count() on a parquet file Key: SPARK-35511 URL: https://issues.apache.org/jira/browse/SPARK-35511 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.0.0 Reporter: Ivan Tsukanov We expect spark uses parquet metadata to fetch the rows count of a parquet file. But when we execute the following code {code:java} import org.apache.spark.SparkConf import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} object Test extends App { val sparkConf = new SparkConf() .setAppName("test-app") .setMaster("local[1]") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() import sparkSession.implicits._ val filePath = "./tempFile.parquet" (1 to 1000).toDF("c1") .repartition(10) .write .mode("overwrite") .parquet(filePath) val df = sparkSession.read.parquet(filePath) var rowsInHeavyComputation = 0 def heavyComputation(row: Row): Row = { rowsInHeavyComputation += 1 println(s"rowsInHeavyComputation = $rowsInHeavyComputation") Thread.sleep(50) row } implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) val cnt = df .map(row => heavyComputation(row)) // map operation cannot change number of rows .count() println(s"counting done, cnt=$cnt") } {code} we see {code:java} rowsInHeavyComputation = 1 rowsInHeavyComputation = 2 ... rowsInHeavyComputation = 999 rowsInHeavyComputation = 1000 counting done, cnt=1000 {code} Expected result - spark does not perform heavyComputation at all. P.S. In our real application we - transform data from parquet files - return some examples (50 rows and spark does heavyComputation only for 50 rows) - return rows count of the whole DataFrame and here spark for some reason computes the whole DataFrame despite the fact there are only map operations and initial rows count can be gotten from parquet meta -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32758) Spark ignores limit(1) and starts tasks for all partition
[ https://issues.apache.org/jira/browse/SPARK-32758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Tsukanov updated SPARK-32758: -- Description: If we run the following code {code:scala} val sparkConf = new SparkConf() .setAppName("test-app") .setMaster("local[1]") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() import sparkSession.implicits._ val df = (1 to 10) .toDF("c1") .repartition(1000) implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) df.limit(1) .map(identity) .collect() df.map(identity) .limit(1) .collect() Thread.sleep(10) {code} we will see that in the first case spark started 1002 tasks despite the fact there is limit(1) - !image-2020-09-01-10-51-09-417.png! Expected behavior - both scenarios (limit before and after map) will produce the same results - one or two tasks to get one value from the DataFrame. was: If we run the following code {code:scala} val sparkConf = new SparkConf() .setAppName("test-app") .setMaster("local[1]") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() import sparkSession.implicits._ val df = (1 to 10) .toDF("c1") .repartition(1000) implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) df.limit(1) .map(identity) .collect() df.map(identity) .limit(1) .collect() Thread.sleep(10) {code} we will see that spark started 1002 tasks despite the fact there is limit(1) - !image-2020-09-01-10-51-09-417.png! Expected behavior - both scenarios (limit before and after map) will produce the same results - one or two tasks to get one value from the DataFrame. > Spark ignores limit(1) and starts tasks for all partition > - > > Key: SPARK-32758 > URL: https://issues.apache.org/jira/browse/SPARK-32758 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Ivan Tsukanov >Priority: Major > Attachments: image-2020-09-01-10-51-09-417.png > > > If we run the following code > {code:scala} > val sparkConf = new SparkConf() > .setAppName("test-app") > .setMaster("local[1]") > val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() > import sparkSession.implicits._ > val df = (1 to 10) > .toDF("c1") > .repartition(1000) > implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) > df.limit(1) > .map(identity) > .collect() > df.map(identity) > .limit(1) > .collect() > Thread.sleep(10) > {code} > we will see that in the first case spark started 1002 tasks despite the fact > there is limit(1) - > !image-2020-09-01-10-51-09-417.png! > Expected behavior - both scenarios (limit before and after map) will produce > the same results - one or two tasks to get one value from the DataFrame. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32758) Spark ignores limit(1) and starts tasks for all partition
[ https://issues.apache.org/jira/browse/SPARK-32758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Tsukanov updated SPARK-32758: -- Description: If we run the following code {code:scala} val sparkConf = new SparkConf() .setAppName("test-app") .setMaster("local[1]") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() import sparkSession.implicits._ val df = (1 to 10) .toDF("c1") .repartition(1000) implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) df.limit(1) .map(identity) .collect() df.map(identity) .limit(1) .collect() Thread.sleep(10) {code} we will see that spark started 1002 tasks despite the fact there is limit(1) - !image-2020-09-01-10-51-09-417.png! Expected behavior - both scenarios (limit before and after map) will produce the same results - one or two tasks to get one value from the DataFrame. was: If we run the following code {code:scala} val sparkConf = new SparkConf() .setAppName("test-app") .setMaster("local[1]") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() import sparkSession.implicits._ val df = (1 to 10) .toDF("c1") .repartition(1000) implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) df.limit(1) .map(identity) .collect() df.map(identity) .limit(1) .collect() Thread.sleep(10) {code} we will see that spark started 1002 tasks despite the fact there is limit(1) - !image-2020-09-01-10-34-47-580.png! Expected behavior - both scenarios (limit before and after map) will produce the same results - one or two tasks to get one value from the DataFrame. > Spark ignores limit(1) and starts tasks for all partition > - > > Key: SPARK-32758 > URL: https://issues.apache.org/jira/browse/SPARK-32758 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 > Environment: > > должен > >Reporter: Ivan Tsukanov >Priority: Major > Attachments: image-2020-09-01-10-51-09-417.png > > > If we run the following code > {code:scala} > val sparkConf = new SparkConf() > .setAppName("test-app") > .setMaster("local[1]") > val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() > import sparkSession.implicits._ > val df = (1 to 10) > .toDF("c1") > .repartition(1000) > implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) > df.limit(1) > .map(identity) > .collect() > df.map(identity) > .limit(1) > .collect() > Thread.sleep(10) > {code} > we will see that spark started 1002 tasks despite the fact there is limit(1) - > !image-2020-09-01-10-51-09-417.png! > Expected behavior - both scenarios (limit before and after map) will produce > the same results - one or two tasks to get one value from the DataFrame. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32758) Spark ignores limit(1) and starts tasks for all partition
[ https://issues.apache.org/jira/browse/SPARK-32758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Tsukanov updated SPARK-32758: -- Environment: (was: должен ) > Spark ignores limit(1) and starts tasks for all partition > - > > Key: SPARK-32758 > URL: https://issues.apache.org/jira/browse/SPARK-32758 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Ivan Tsukanov >Priority: Major > Attachments: image-2020-09-01-10-51-09-417.png > > > If we run the following code > {code:scala} > val sparkConf = new SparkConf() > .setAppName("test-app") > .setMaster("local[1]") > val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() > import sparkSession.implicits._ > val df = (1 to 10) > .toDF("c1") > .repartition(1000) > implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) > df.limit(1) > .map(identity) > .collect() > df.map(identity) > .limit(1) > .collect() > Thread.sleep(10) > {code} > we will see that spark started 1002 tasks despite the fact there is limit(1) - > !image-2020-09-01-10-51-09-417.png! > Expected behavior - both scenarios (limit before and after map) will produce > the same results - one or two tasks to get one value from the DataFrame. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32758) Spark ignores limit(1) and starts tasks for all partition
[ https://issues.apache.org/jira/browse/SPARK-32758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Tsukanov updated SPARK-32758: -- Attachment: image-2020-09-01-10-51-09-417.png > Spark ignores limit(1) and starts tasks for all partition > - > > Key: SPARK-32758 > URL: https://issues.apache.org/jira/browse/SPARK-32758 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 > Environment: > > должен > >Reporter: Ivan Tsukanov >Priority: Major > Attachments: image-2020-09-01-10-51-09-417.png > > > If we run the following code > {code:scala} > val sparkConf = new SparkConf() > .setAppName("test-app") > .setMaster("local[1]") > val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() > import sparkSession.implicits._ > val df = (1 to 10) > .toDF("c1") > .repartition(1000) > implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) > df.limit(1) > .map(identity) > .collect() > df.map(identity) > .limit(1) > .collect() > Thread.sleep(10) > {code} > we will see that spark started 1002 tasks despite the fact there is limit(1) - > !image-2020-09-01-10-34-47-580.png! > Expected behavior - both scenarios (limit before and after map) will produce > the same results - one or two tasks to get one value from the DataFrame. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32758) Spark ignores limit(1) and starts tasks for all partition
Ivan Tsukanov created SPARK-32758: - Summary: Spark ignores limit(1) and starts tasks for all partition Key: SPARK-32758 URL: https://issues.apache.org/jira/browse/SPARK-32758 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.0 Environment: должен Reporter: Ivan Tsukanov If we run the following code {code:scala} val sparkConf = new SparkConf() .setAppName("test-app") .setMaster("local[1]") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() import sparkSession.implicits._ val df = (1 to 10) .toDF("c1") .repartition(1000) implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) df.limit(1) .map(identity) .collect() df.map(identity) .limit(1) .collect() Thread.sleep(10) {code} we will see that spark started 1002 tasks despite the fact there is limit(1) - !image-2020-09-01-10-34-47-580.png! Expected behavior - both scenarios (limit before and after map) will produce the same results - one or two tasks to get one value from the DataFrame. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28742) StackOverflowError when using otherwise(col()) in a loop
[ https://issues.apache.org/jira/browse/SPARK-28742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Tsukanov updated SPARK-28742: -- Description: The following code {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) (1 to 9).foldLeft(df) { case (acc, _) => val res = acc.withColumn("c1", column) res.take(1) res } {code} falls with {code:java} java.lang.StackOverflowError at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395) ...{code} Probably, the problem is spark generates unexplainable big Physical Plan - {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) val result = (1 to 9).foldLeft(df) { case (acc, _) => acc.withColumn("c1", column) } result.explain() {code} it shows a plan 18936 symbols length {code:java} == Physical Plan == *(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE 18936 symbols +- Scan ExistingRDD[c1#1] {code} was: The following code {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) (1 to 9).foldLeft(df) { case (acc, _) => val res = acc.withColumn("c1", column) res.take(1) res } {code} falls with {code:java} java.lang.StackOverflowError at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395) ...{code} Probably, the problem is spark generates unexplainable big Physical Plan - {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) val result = (1 to 9).foldLeft(df) { case (acc, _) => acc.withColumn("c1", column) } result.explain() {code} it shows a plan 18936 symbols length {code:java} == Physical Plan == *(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE +- Scan ExistingRDD[c1#1] {code} > StackOverflowError when using otherwise(col()) in a loop > > > Key: SPARK-28742 > URL: https://issues.apache.org/jira/browse/SPARK-28742 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0, 2.4.3 >Reporter: Ivan Tsukanov >Priority: Major > > The following code > {code:java} > val rdd = sparkContext.makeRDD(Seq(Row("1"))) > val schema = StructType(Seq( > StructField("c1", StringType) > )) > val df = sparkSession.createDataFrame(rdd, schema) > val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) > (1 to 9).foldLeft(df) { case (acc, _) => > val res = acc.withColumn("c1", column) > res.take(1) > res > } > {code} > falls with > {code:java} > java.lang.StackOverflowError >at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395) >...{code} > Probably, the problem is spark generates unexplainable big Physical Plan - > {code:java} > val rdd = sparkContext.makeRDD(Seq(Row("1"))) > val schema = StructType(Seq( > StructField("c1", StringType) > )) > val df = sparkSession.createDataFrame(rdd, schema) > val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) > val result = (1 to 9).foldLeft(df) { case (acc, _) => > acc.withColumn("c1", column) > } > result.explain() > {code} > it shows a plan 18936 symbols length > {code:java} > == Physical Plan == > *(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE > WHEN (CASE 18936 symbols > +- Scan ExistingRDD[c1#1] {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28742) StackOverflowError when using otherwise(col()) in a loop
[ https://issues.apache.org/jira/browse/SPARK-28742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Tsukanov updated SPARK-28742: -- Description: The following code {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) (1 to 9).foldLeft(df) { case (acc, _) => val res = acc.withColumn("c1", column) res.take(1) res } {code} falls with {code:java} java.lang.StackOverflowError at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395) ...{code} Probably, the problem is spark generates unexplainable big Physical Plan - {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) val result = (1 to 9).foldLeft(df) { case (acc, _) => acc.withColumn("c1", column) } result.explain() {code} it shows a plan 18936 symbols length {code:java} == Physical Plan == *(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE +- Scan ExistingRDD[c1#1] {code} was: The following code {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) (1 to 9).foldLeft(df) { case (acc, _) => val res = acc.withColumn("c1", column) res.take(1) res } {code} falls with {code:java} java.lang.StackOverflowError at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395) ...{code} Probably, the problem is spark generates unexplainable big Physical Plan - {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) val result = (1 to 9).foldLeft(df) { case (acc, _) => acc.withColumn("c1", column) } result.explain() {code} it shows a plan 18936 symbols length {code:java} == Physical Plan == *(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE +- Scan ExistingRDD[c1#1] {code} > StackOverflowError when using otherwise(col()) in a loop > > > Key: SPARK-28742 > URL: https://issues.apache.org/jira/browse/SPARK-28742 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0, 2.4.3 >Reporter: Ivan Tsukanov >Priority: Major > > The following code > {code:java} > val rdd = sparkContext.makeRDD(Seq(Row("1"))) > val schema = StructType(Seq( > StructField("c1", StringType) > )) > val df = sparkSession.createDataFrame(rdd, schema) > val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) > (1 to 9).foldLeft(df) { case (acc, _) => > val res = acc.withColumn("c1", column) > res.take(1) > res > } > {code} > falls with > {code:java} > java.lang.StackOverflowError >at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395) >...{code} > Probably, the problem is spark generates unexplainable big Physical Plan - > {code:java} > val rdd = sparkContext.makeRDD(Seq(Row("1"))) > val schema = StructType(Seq( > StructField("c1", StringType) > )) > val df = sparkSession.createDataFrame(rdd, schema) > val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) > val result = (1 to 9).foldLeft(df) { case (acc, _) => > acc.withColumn("c1", column) > } > result.explain() > {code} > it shows a plan 18936 symbols length > {code:java} > == Physical Plan == > *(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE > WHEN (CASE > +- Scan ExistingRDD[c1#1] {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28742) StackOverflowError when using otherwise(col()) in a loop
[ https://issues.apache.org/jira/browse/SPARK-28742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Tsukanov updated SPARK-28742: -- Description: The following code {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) (1 to 9).foldLeft(df) { case (acc, _) => val res = acc.withColumn("c1", column) res.take(1) res } {code} falls with {code:java} java.lang.StackOverflowError at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395) ...{code} Probably, the problem is spark generates unexplainable big Physical Plan - {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) val result = (1 to 9).foldLeft(df) { case (acc, _) => acc.withColumn("c1", column) } result.explain() {code} it shows a plan 18936 symbols length {code:java} == Physical Plan == *(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE +- Scan ExistingRDD[c1#1] {code} was: The following code {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) (1 to 9).foldLeft(df) { case (acc, _) => val res = acc.withColumn("c1", column) res.take(1) res } {code} falls with {code:java} java.lang.StackOverflowError at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395) ...{code} Probably, the problem is spark generates unexplainable big Physical Plan - {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) val result = (1 to 9).foldLeft(df) { case (acc, _) => acc.withColumn("c1", column) } result.explain() {code} it shows a plan 18936 symbols length {code:java} == Physical Plan == *(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE +- Scan ExistingRDD[c1#1]{code} > StackOverflowError when using otherwise(col()) in a loop > > > Key: SPARK-28742 > URL: https://issues.apache.org/jira/browse/SPARK-28742 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0, 2.4.3 >Reporter: Ivan Tsukanov >Priority: Major > > The following code > {code:java} > val rdd = sparkContext.makeRDD(Seq(Row("1"))) > val schema = StructType(Seq( > StructField("c1", StringType) > )) > val df = sparkSession.createDataFrame(rdd, schema) > val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) > (1 to 9).foldLeft(df) { case (acc, _) => > val res = acc.withColumn("c1", column) > res.take(1) > res > } > {code} > falls with > {code:java} > java.lang.StackOverflowError >at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395) >...{code} > Probably, the problem is spark generates unexplainable big Physical Plan - > > {code:java} > val rdd = sparkContext.makeRDD(Seq(Row("1"))) > val schema = StructType(Seq( > StructField("c1", StringType) > )) > val df = sparkSession.createDataFrame(rdd, schema) > val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) > val result = (1 to 9).foldLeft(df) { case (acc, _) => > acc.withColumn("c1", column) > } > result.explain() > {code} > it shows a plan 18936 symbols length > {code:java} > == Physical Plan == > *(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE > WHEN (CASE > +- Scan ExistingRDD[c1#1] {code} > -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28742) StackOverflowError when using otherwise(col()) in a loop
[ https://issues.apache.org/jira/browse/SPARK-28742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Tsukanov updated SPARK-28742: -- Description: The following code {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) (1 to 9).foldLeft(df) { case (acc, _) => val res = acc.withColumn("c1", column) res.take(1) res } {code} falls with {code:java} java.lang.StackOverflowError at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395) ...{code} Probably, the problem is spark generates unexplainable big Physical Plan - {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) val result = (1 to 9).foldLeft(df) { case (acc, _) => acc.withColumn("c1", column) } result.explain() {code} it shows a plan 18936 symbols length {code:java} == Physical Plan == *(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE +- Scan ExistingRDD[c1#1]{code} was: The following code {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) (1 to 9).foldLeft(df) { case (acc, _) => val res = acc.withColumn("c1", column) res.take(1) res } {code} falls with {code:java} java.lang.StackOverflowError at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395) ...{code} Probably, the problem is spark generates unexplainable big Physical Plan - !image-2019-08-15-15-10-13-397.png! > StackOverflowError when using otherwise(col()) in a loop > > > Key: SPARK-28742 > URL: https://issues.apache.org/jira/browse/SPARK-28742 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0, 2.4.3 >Reporter: Ivan Tsukanov >Priority: Major > > The following code > {code:java} > val rdd = sparkContext.makeRDD(Seq(Row("1"))) > val schema = StructType(Seq( > StructField("c1", StringType) > )) > val df = sparkSession.createDataFrame(rdd, schema) > val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) > (1 to 9).foldLeft(df) { case (acc, _) => > val res = acc.withColumn("c1", column) > res.take(1) > res > } > {code} > falls with > {code:java} > java.lang.StackOverflowError >at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395) >...{code} > Probably, the problem is spark generates unexplainable big Physical Plan - > > {code:java} > val rdd = sparkContext.makeRDD(Seq(Row("1"))) > val schema = StructType(Seq( > StructField("c1", StringType) > )) > val df = sparkSession.createDataFrame(rdd, schema) > val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) > val result = (1 to 9).foldLeft(df) { case (acc, _) => > acc.withColumn("c1", column) > } > result.explain() > {code} > it shows a plan 18936 symbols length > {code:java} > == Physical Plan == > *(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE > WHEN (CASE > +- Scan ExistingRDD[c1#1]{code} > > > -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28742) StackOverflowError when using otherwise(col()) in a loop
[ https://issues.apache.org/jira/browse/SPARK-28742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Tsukanov updated SPARK-28742: -- Attachment: image-2019-08-15-15-19-33-319.png > StackOverflowError when using otherwise(col()) in a loop > > > Key: SPARK-28742 > URL: https://issues.apache.org/jira/browse/SPARK-28742 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0, 2.4.3 >Reporter: Ivan Tsukanov >Priority: Major > > The following code > > {code:java} > val rdd = sparkContext.makeRDD(Seq(Row("1"))) > val schema = StructType(Seq( > StructField("c1", StringType) > )) > val df = sparkSession.createDataFrame(rdd, schema) > val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) > (1 to 9).foldLeft(df) { case (acc, _) => > val res = acc.withColumn("c1", column) > res.take(1) > res > } > {code} > falls with > > {code:java} > java.lang.StackOverflowError >at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395) >...{code} > > Probably, the problem is spark generates unexplainable big Physical Plan - > !image-2019-08-15-15-10-13-397.png! > > -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28742) StackOverflowError when using otherwise(col()) in a loop
[ https://issues.apache.org/jira/browse/SPARK-28742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Tsukanov updated SPARK-28742: -- Attachment: (was: image-2019-08-15-15-19-33-319.png) > StackOverflowError when using otherwise(col()) in a loop > > > Key: SPARK-28742 > URL: https://issues.apache.org/jira/browse/SPARK-28742 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0, 2.4.3 >Reporter: Ivan Tsukanov >Priority: Major > > The following code > > {code:java} > val rdd = sparkContext.makeRDD(Seq(Row("1"))) > val schema = StructType(Seq( > StructField("c1", StringType) > )) > val df = sparkSession.createDataFrame(rdd, schema) > val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) > (1 to 9).foldLeft(df) { case (acc, _) => > val res = acc.withColumn("c1", column) > res.take(1) > res > } > {code} > falls with > > {code:java} > java.lang.StackOverflowError >at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395) >...{code} > > Probably, the problem is spark generates unexplainable big Physical Plan - > !image-2019-08-15-15-10-13-397.png! > > -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28742) StackOverflowError when using otherwise(col()) in a loop
Ivan Tsukanov created SPARK-28742: - Summary: StackOverflowError when using otherwise(col()) in a loop Key: SPARK-28742 URL: https://issues.apache.org/jira/browse/SPARK-28742 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.3, 2.4.0 Reporter: Ivan Tsukanov The following code {code:java} val rdd = sparkContext.makeRDD(Seq(Row("1"))) val schema = StructType(Seq( StructField("c1", StringType) )) val df = sparkSession.createDataFrame(rdd, schema) val column = when(col("c1").isin("1"), "1").otherwise(col("c1")) (1 to 9).foldLeft(df) { case (acc, _) => val res = acc.withColumn("c1", column) res.take(1) res } {code} falls with {code:java} java.lang.StackOverflowError at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395) ...{code} Probably, the problem is spark generates unexplainable big Physical Plan - !image-2019-08-15-15-10-13-397.png! -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28480) Types of input parameters of a UDF affect the ability to cache the result
[ https://issues.apache.org/jira/browse/SPARK-28480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16890694#comment-16890694 ] Ivan Tsukanov commented on SPARK-28480: --- ok, let's close the ticket. [~shivuson...@gmail.com], thanks for the help! > Types of input parameters of a UDF affect the ability to cache the result > - > > Key: SPARK-28480 > URL: https://issues.apache.org/jira/browse/SPARK-28480 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: Ivan Tsukanov >Priority: Major > Fix For: 2.4.3 > > Attachments: image-2019-07-23-10-58-45-768.png > > > When I define a parameter in a UDF as Boolean or Int the result DataFrame > can't be cached > {code:java} > import org.apache.spark.sql.functions.{lit, udf} > val empty = sparkSession.emptyDataFrame > val table = "table" > def test(customUDF: UserDefinedFunction, col: Column): Unit = { > val df = empty.select(customUDF(col)) > df.cache() > df.createOrReplaceTempView(table) > println(sparkSession.catalog.isCached(table)) > } > test(udf { _: String => 42 }, lit("")) // true > test(udf { _: Any => 42 }, lit("")) // true > test(udf { _: Int => 42 }, lit(42)) // false > test(udf { _: Boolean => 42 }, lit(false)) // false > {code} > or sparkSession.catalog.isCached gives irrelevant information. -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28480) Types of input parameters of a UDF affect the ability to cache the result
[ https://issues.apache.org/jira/browse/SPARK-28480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Tsukanov updated SPARK-28480: -- Fix Version/s: 2.4.3 > Types of input parameters of a UDF affect the ability to cache the result > - > > Key: SPARK-28480 > URL: https://issues.apache.org/jira/browse/SPARK-28480 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: Ivan Tsukanov >Priority: Major > Fix For: 2.4.3 > > Attachments: image-2019-07-23-10-58-45-768.png > > > When I define a parameter in a UDF as Boolean or Int the result DataFrame > can't be cached > {code:java} > import org.apache.spark.sql.functions.{lit, udf} > val empty = sparkSession.emptyDataFrame > val table = "table" > def test(customUDF: UserDefinedFunction, col: Column): Unit = { > val df = empty.select(customUDF(col)) > df.cache() > df.createOrReplaceTempView(table) > println(sparkSession.catalog.isCached(table)) > } > test(udf { _: String => 42 }, lit("")) // true > test(udf { _: Any => 42 }, lit("")) // true > test(udf { _: Int => 42 }, lit(42)) // false > test(udf { _: Boolean => 42 }, lit(false)) // false > {code} > or sparkSession.catalog.isCached gives irrelevant information. -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28480) Types of input parameters of a UDF affect the ability to cache the result
Ivan Tsukanov created SPARK-28480: - Summary: Types of input parameters of a UDF affect the ability to cache the result Key: SPARK-28480 URL: https://issues.apache.org/jira/browse/SPARK-28480 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.1 Reporter: Ivan Tsukanov When I define a parameter in a UDF as Boolean or Int the result DataFrame can't be cached {code:java} import org.apache.spark.sql.functions.{lit, udf} val empty = sparkSession.emptyDataFrame val table = "table" def test(customUDF: UserDefinedFunction, col: Column): Unit = { val df = empty.select(customUDF(col)) df.cache() df.createOrReplaceTempView(table) println(sparkSession.catalog.isCached(table)) } test(udf { _: String => 42 }, lit("")) // true test(udf { _: Any => 42 }, lit("")) // true test(udf { _: Int => 42 }, lit(42)) // false test(udf { _: Boolean => 42 }, lit(false)) // false {code} or sparkSession.catalog.isCached gives irrelevant information. -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28480) Types of input parameters of a UDF affect the ability to cache the result
[ https://issues.apache.org/jira/browse/SPARK-28480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Tsukanov updated SPARK-28480: -- Description: When I define a parameter in a UDF as Boolean or Int the result DataFrame can't be cached {code:java} import org.apache.spark.sql.functions.{lit, udf} val empty = sparkSession.emptyDataFrame val table = "table" def test(customUDF: UserDefinedFunction, col: Column): Unit = { val df = empty.select(customUDF(col)) df.cache() df.createOrReplaceTempView(table) println(sparkSession.catalog.isCached(table)) } test(udf { _: String => 42 }, lit("")) // true test(udf { _: Any => 42 }, lit("")) // true test(udf { _: Int => 42 }, lit(42)) // false test(udf { _: Boolean => 42 }, lit(false)) // false {code} or sparkSession.catalog.isCached gives irrelevant information. was: When I define a parameter in a UDF as Boolean or Int the result DataFrame can't be cached {code:java} import org.apache.spark.sql.functions.{lit, udf} val empty = sparkSession.emptyDataFrame val table = "table" def test(customUDF: UserDefinedFunction, col: Column): Unit = { val df = empty.select(customUDF(col)) df.cache() df.createOrReplaceTempView(table) println(sparkSession.catalog.isCached(table)) } test(udf { _: String => 42 }, lit("")) // true test(udf { _: Any => 42 }, lit("")) // true test(udf { _: Int => 42 }, lit(42)) // false test(udf { _: Boolean => 42 }, lit(false)) // false {code} or sparkSession.catalog.isCached gives irrelevant information. > Types of input parameters of a UDF affect the ability to cache the result > - > > Key: SPARK-28480 > URL: https://issues.apache.org/jira/browse/SPARK-28480 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: Ivan Tsukanov >Priority: Major > > When I define a parameter in a UDF as Boolean or Int the result DataFrame > can't be cached > {code:java} > import org.apache.spark.sql.functions.{lit, udf} > val empty = sparkSession.emptyDataFrame > val table = "table" > def test(customUDF: UserDefinedFunction, col: Column): Unit = { > val df = empty.select(customUDF(col)) > df.cache() > df.createOrReplaceTempView(table) > println(sparkSession.catalog.isCached(table)) > } > test(udf { _: String => 42 }, lit("")) // true > test(udf { _: Any => 42 }, lit("")) // true > test(udf { _: Int => 42 }, lit(42)) // false > test(udf { _: Boolean => 42 }, lit(false)) // false > {code} > or sparkSession.catalog.isCached gives irrelevant information. -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25987) StackOverflowError when executing many operations on a table with many columns
[ https://issues.apache.org/jira/browse/SPARK-25987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Tsukanov updated SPARK-25987: -- Description: When I execute {code:java} val columnsCount = 100 val columns = (1 to columnsCount).map(i => s"col$i") val initialData = (1 to columnsCount).map(i => s"val$i") val df = sparkSession.createDataFrame( rowRDD = sparkSession.sparkContext.makeRDD(Seq(Row.fromSeq(initialData))), schema = StructType(columns.map(StructField(_, StringType, true))) ) val addSuffixUDF = udf( (str: String) => str + "_added" ) implicit class DFOps(df: DataFrame) { def addSuffix() = { df.select(columns.map(col => addSuffixUDF(df(col)).as(col) ): _*) } } df .addSuffix() .addSuffix() .addSuffix() .show() {code} I get {code:java} An exception or error caused a run to abort. java.lang.StackOverflowError at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:385) at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:553) ... {code} If I reduce columns number (to 10 for example) or do `addSuffix` only once - it works fine. was: When I execute {code:java} val columnsCount = 100 val columns = (1 to columnsCount).map(i => s"col$i") val initialData = (1 to columnsCount).map(i => s"val$i") val df = sparkSession.createDataFrame( rowRDD = sparkSession.sparkContext.makeRDD(Seq(Row.fromSeq(initialData))), schema = StructType(columns.map(StructField(_, StringType, true))) ) val addSuffixUDF = udf( (str: String) => str + "_added" ) implicit class DFOps(df: DataFrame) { def addSuffix() = { df.select(columns.map(col => addSuffixUDF(df(col)).as(col) ): _*) } } df .addSuffix() .addSuffix() .addSuffix() .show() {code} I get {code:java} An exception or error caused a run to abort. java.lang.StackOverflowError at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:385) at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:553) ... {code} If I reduce columns number (to 10 for example) or do `addSuffix` only once - it works fine. > StackOverflowError when executing many operations on a table with many columns > -- > > Key: SPARK-25987 > URL: https://issues.apache.org/jira/browse/SPARK-25987 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.1, 2.2.2, 2.3.0, 2.3.2 > Environment: Ubuntu 18.04.1 LTS, openjdk "1.8.0_181" >Reporter: Ivan Tsukanov >Priority: Major > > When I execute > {code:java} > val columnsCount = 100 > val columns = (1 to columnsCount).map(i => s"col$i") > val initialData = (1 to columnsCount).map(i => s"val$i") > val df = sparkSession.createDataFrame( > rowRDD = sparkSession.sparkContext.makeRDD(Seq(Row.fromSeq(initialData))), > schema = StructType(columns.map(StructField(_, StringType, true))) > ) > val addSuffixUDF = udf( > (str: String) => str + "_added" > ) > implicit class DFOps(df: DataFrame) { > def addSuffix() = { > df.select(columns.map(col => > addSuffixUDF(df(col)).as(col) > ): _*) > } > } > df > .addSuffix() > .addSuffix() > .addSuffix() > .show() > {code} > I get > {code:java} > An exception or error caused a run to abort. > java.lang.StackOverflowError > at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:385) > at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:553) > ... > {code} > If I reduce columns number (to 10 for example) or do `addSuffix` only once - > it works fine. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25987) StackOverflowError when executing many operations on a table with many columns
[ https://issues.apache.org/jira/browse/SPARK-25987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Tsukanov updated SPARK-25987: -- Description: When I execute {code:java} val columnsCount = 100 val columns = (1 to columnsCount).map(i => s"col$i") val initialData = (1 to columnsCount).map(i => s"val$i") val df = sparkSession.createDataFrame( rowRDD = sparkSession.sparkContext.makeRDD(Seq(Row.fromSeq(initialData))), schema = StructType(columns.map(StructField(_, StringType, true))) ) val addSuffixUDF = udf( (str: String) => str + "_added" ) implicit class DFOps(df: DataFrame) { def addSuffix() = { df.select(columns.map(col => addSuffixUDF(df(col)).as(col) ): _*) } } df .addSuffix() .addSuffix() .addSuffix() .show() {code} I get {code:java} An exception or error caused a run to abort. java.lang.StackOverflowError at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:385) at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:553) ... {code} If I reduce columns number (to 10 for example) or do `addSuffix` only once - it works fine. was: When I execute {code:java} val columnsCount = 100 val columns = (1 to columnsCount).map(i => s"col$i") val initialData = (1 to columnsCount).map(i => s"val$i") val df = sparkSession.createDataFrame( rowRDD = sparkSession.sparkContext.makeRDD(Seq(Row.fromSeq(initialData))), schema = StructType(columns.map(StructField(_, StringType, true))) ) val addSuffixUDF = udf( (str: String) => str + "_added" ) implicit class DFOps(df: DataFrame) { def addSuffix() = { df.select(columns.map(col => addSuffixUDF(df(col)).as(col) ): _*) } } df .addSuffix() .addSuffix() .addSuffix() .show() {code} I get {code:java} An exception or error caused a run to abort. java.lang.StackOverflowError at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:385) at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:553) ... {code} If I reduce columns number (to 10 for example) or do `addSuffix` only once - it works fine. > StackOverflowError when executing many operations on a table with many columns > -- > > Key: SPARK-25987 > URL: https://issues.apache.org/jira/browse/SPARK-25987 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.1, 2.2.2, 2.3.0, 2.3.2 > Environment: Ubuntu 18.04.1 LTS, openjdk "1.8.0_181" >Reporter: Ivan Tsukanov >Priority: Major > > When I execute > {code:java} > val columnsCount = 100 > val columns = (1 to columnsCount).map(i => s"col$i") > val initialData = (1 to columnsCount).map(i => s"val$i") > val df = sparkSession.createDataFrame( > rowRDD = sparkSession.sparkContext.makeRDD(Seq(Row.fromSeq(initialData))), > schema = StructType(columns.map(StructField(_, StringType, true))) > ) > val addSuffixUDF = udf( > (str: String) => str + "_added" > ) > implicit class DFOps(df: DataFrame) { > def addSuffix() = { > df.select(columns.map(col => > addSuffixUDF(df(col)).as(col) > ): _*) > } > } > df > .addSuffix() > .addSuffix() > .addSuffix() > .show() > {code} > I get > {code:java} > An exception or error caused a run to abort. > java.lang.StackOverflowError > at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:385) > at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:553) > ... > {code} > If I reduce columns number (to 10 for example) or do `addSuffix` only once - > it works fine. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25987) StackOverflowError when executing many operations on a table with many columns
Ivan Tsukanov created SPARK-25987: - Summary: StackOverflowError when executing many operations on a table with many columns Key: SPARK-25987 URL: https://issues.apache.org/jira/browse/SPARK-25987 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.2, 2.3.0, 2.2.2, 2.2.1 Environment: Ubuntu 18.04.1 LTS, openjdk "1.8.0_181" Reporter: Ivan Tsukanov When I execute {code:java} val columnsCount = 100 val columns = (1 to columnsCount).map(i => s"col$i") val initialData = (1 to columnsCount).map(i => s"val$i") val df = sparkSession.createDataFrame( rowRDD = sparkSession.sparkContext.makeRDD(Seq(Row.fromSeq(initialData))), schema = StructType(columns.map(StructField(_, StringType, true))) ) val addSuffixUDF = udf( (str: String) => str + "_added" ) implicit class DFOps(df: DataFrame) { def addSuffix() = { df.select(columns.map(col => addSuffixUDF(df(col)).as(col) ): _*) } } df .addSuffix() .addSuffix() .addSuffix() .show() {code} I get {code:java} An exception or error caused a run to abort. java.lang.StackOverflowError at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:385) at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:553) ... {code} If I reduce columns number (to 10 for example) or do `addSuffix` only once - it works fine. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org