[jira] [Updated] (SPARK-35511) Spark computes all rows during count() on a parquet file

2021-05-25 Thread Ivan Tsukanov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Tsukanov updated SPARK-35511:
--
Description: 
We expect spark uses parquet metadata to fetch the rows count of a parquet 
file. But when we execute the following code 
{code:java}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}

object Test extends App {
  val sparkConf = new SparkConf()
.setAppName("test-app")
.setMaster("local[1]")

  val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
  import sparkSession.implicits._

  val filePath = "./tempFile.parquet"
  (1 to 1000).toDF("c1")
.repartition(10)
.write
.mode("overwrite")
.parquet(filePath)

  val df = sparkSession.read.parquet(filePath)

  var rowsInHeavyComputation = 0
  def heavyComputation(row: Row): Row = {
rowsInHeavyComputation += 1
println(s"rowsInHeavyComputation = $rowsInHeavyComputation")
Thread.sleep(50)
row
  }

  implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)
  val cnt = df
.map(row => heavyComputation(row)) // map operation cannot change number of 
rows 
.count()
  println(s"counting done, cnt=$cnt")
}
{code}
we see 
{code:java}
rowsInHeavyComputation = 1
rowsInHeavyComputation = 2
...
rowsInHeavyComputation = 999
rowsInHeavyComputation = 1000
counting done, cnt=1000
{code}
 *Expected result* - spark does not perform heavyComputation at all.

 

P.S. In our real application we:
 - transform data from parquet files
 - return some examples (50 rows and spark does heavyComputation only for 50 
rows)
 - return rows count of the whole DataFrame and here spark for some reason 
computes the whole DataFrame despite the fact there are only map operations and 
initial rows count can be gotten from parquet meta

 

 

  was:
We expect spark uses parquet metadata to fetch the rows count of a parquet 
file. But when we execute the following code

 
{code:java}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}

object Test extends App {
  val sparkConf = new SparkConf()
.setAppName("test-app")
.setMaster("local[1]")

  val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
  import sparkSession.implicits._

  val filePath = "./tempFile.parquet"
  (1 to 1000).toDF("c1")
.repartition(10)
.write
.mode("overwrite")
.parquet(filePath)

  val df = sparkSession.read.parquet(filePath)

  var rowsInHeavyComputation = 0
  def heavyComputation(row: Row): Row = {
rowsInHeavyComputation += 1
println(s"rowsInHeavyComputation = $rowsInHeavyComputation")
Thread.sleep(50)
row
  }

  implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)
  val cnt = df
.map(row => heavyComputation(row)) // map operation cannot change number of 
rows 
.count()
  println(s"counting done, cnt=$cnt")
}
{code}
we see

 

 
{code:java}
rowsInHeavyComputation = 1
rowsInHeavyComputation = 2
...
rowsInHeavyComputation = 999
rowsInHeavyComputation = 1000
counting done, cnt=1000
{code}
 

Expected result - spark does not perform heavyComputation at all.

 

P.S. In our real application we
- transform data from parquet files
- return some examples (50 rows and spark does heavyComputation only for 50 
rows)
- return rows count of the whole DataFrame and here spark for some reason 
computes the whole DataFrame despite the fact there are only map operations and 
initial rows count can be gotten from parquet meta

 

 


> Spark computes all rows during count() on a parquet file
> 
>
> Key: SPARK-35511
> URL: https://issues.apache.org/jira/browse/SPARK-35511
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Ivan Tsukanov
>Priority: Major
>
> We expect spark uses parquet metadata to fetch the rows count of a parquet 
> file. But when we execute the following code 
> {code:java}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.{Row, SparkSession}
> import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
> object Test extends App {
>   val sparkConf = new SparkConf()
> .setAppName("test-app")
> .setMaster("local[1]")
>   val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
>   import sparkSession.implicits._
>   val filePath = "./tempFile.parquet"
>   (1 to 1000).toDF("c1")
> .repartition(10)
> .write
> .mode("overwrite")
> .parquet(filePath)
>   val df = sparkSession.read.parquet(filePath)
>   var rowsInHeavyComputation = 0
>   def heavyComputation(row: Row): Row = {
> row

[jira] [Created] (SPARK-35511) Spark computes all rows during count() on a parquet file

2021-05-25 Thread Ivan Tsukanov (Jira)
Ivan Tsukanov created SPARK-35511:
-

 Summary: Spark computes all rows during count() on a parquet file
 Key: SPARK-35511
 URL: https://issues.apache.org/jira/browse/SPARK-35511
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Ivan Tsukanov


We expect spark uses parquet metadata to fetch the rows count of a parquet 
file. But when we execute the following code

 
{code:java}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}

object Test extends App {
  val sparkConf = new SparkConf()
.setAppName("test-app")
.setMaster("local[1]")

  val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
  import sparkSession.implicits._

  val filePath = "./tempFile.parquet"
  (1 to 1000).toDF("c1")
.repartition(10)
.write
.mode("overwrite")
.parquet(filePath)

  val df = sparkSession.read.parquet(filePath)

  var rowsInHeavyComputation = 0
  def heavyComputation(row: Row): Row = {
rowsInHeavyComputation += 1
println(s"rowsInHeavyComputation = $rowsInHeavyComputation")
Thread.sleep(50)
row
  }

  implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)
  val cnt = df
.map(row => heavyComputation(row)) // map operation cannot change number of 
rows 
.count()
  println(s"counting done, cnt=$cnt")
}
{code}
we see

 

 
{code:java}
rowsInHeavyComputation = 1
rowsInHeavyComputation = 2
...
rowsInHeavyComputation = 999
rowsInHeavyComputation = 1000
counting done, cnt=1000
{code}
 

Expected result - spark does not perform heavyComputation at all.

 

P.S. In our real application we
- transform data from parquet files
- return some examples (50 rows and spark does heavyComputation only for 50 
rows)
- return rows count of the whole DataFrame and here spark for some reason 
computes the whole DataFrame despite the fact there are only map operations and 
initial rows count can be gotten from parquet meta

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32758) Spark ignores limit(1) and starts tasks for all partition

2020-08-31 Thread Ivan Tsukanov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Tsukanov updated SPARK-32758:
--
Description: 
If we run the following code
{code:scala}
  val sparkConf = new SparkConf()
.setAppName("test-app")
.setMaster("local[1]")
  val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

  import sparkSession.implicits._

  val df = (1 to 10)
.toDF("c1")
.repartition(1000)

  implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)

  df.limit(1)
.map(identity)
.collect()

  df.map(identity)
.limit(1)
.collect()

  Thread.sleep(10)
{code}
we will see that in the first case spark started 1002 tasks despite the fact 
there is limit(1) -

!image-2020-09-01-10-51-09-417.png!

Expected behavior - both scenarios (limit before and after map) will produce 
the same results - one or two tasks to get one value from the DataFrame.

  was:
If we run the following code
{code:scala}
  val sparkConf = new SparkConf()
.setAppName("test-app")
.setMaster("local[1]")
  val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

  import sparkSession.implicits._

  val df = (1 to 10)
.toDF("c1")
.repartition(1000)

  implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)

  df.limit(1)
.map(identity)
.collect()

  df.map(identity)
.limit(1)
.collect()

  Thread.sleep(10)
{code}
we will see that spark started 1002 tasks despite the fact there is limit(1) -

!image-2020-09-01-10-51-09-417.png!

Expected behavior - both scenarios (limit before and after map) will produce 
the same results - one or two tasks to get one value from the DataFrame.


> Spark ignores limit(1) and starts tasks for all partition
> -
>
> Key: SPARK-32758
> URL: https://issues.apache.org/jira/browse/SPARK-32758
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Ivan Tsukanov
>Priority: Major
> Attachments: image-2020-09-01-10-51-09-417.png
>
>
> If we run the following code
> {code:scala}
>   val sparkConf = new SparkConf()
> .setAppName("test-app")
> .setMaster("local[1]")
>   val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
>   import sparkSession.implicits._
>   val df = (1 to 10)
> .toDF("c1")
> .repartition(1000)
>   implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)
>   df.limit(1)
> .map(identity)
> .collect()
>   df.map(identity)
> .limit(1)
> .collect()
>   Thread.sleep(10)
> {code}
> we will see that in the first case spark started 1002 tasks despite the fact 
> there is limit(1) -
> !image-2020-09-01-10-51-09-417.png!
> Expected behavior - both scenarios (limit before and after map) will produce 
> the same results - one or two tasks to get one value from the DataFrame.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32758) Spark ignores limit(1) and starts tasks for all partition

2020-08-31 Thread Ivan Tsukanov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Tsukanov updated SPARK-32758:
--
Description: 
If we run the following code
{code:scala}
  val sparkConf = new SparkConf()
.setAppName("test-app")
.setMaster("local[1]")
  val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

  import sparkSession.implicits._

  val df = (1 to 10)
.toDF("c1")
.repartition(1000)

  implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)

  df.limit(1)
.map(identity)
.collect()

  df.map(identity)
.limit(1)
.collect()

  Thread.sleep(10)
{code}
we will see that spark started 1002 tasks despite the fact there is limit(1) -

!image-2020-09-01-10-51-09-417.png!

Expected behavior - both scenarios (limit before and after map) will produce 
the same results - one or two tasks to get one value from the DataFrame.

  was:
If we run the following code
{code:scala}
  val sparkConf = new SparkConf()
.setAppName("test-app")
.setMaster("local[1]")
  val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

  import sparkSession.implicits._

  val df = (1 to 10)
.toDF("c1")
.repartition(1000)

  implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)

  df.limit(1)
.map(identity)
.collect()

  df.map(identity)
.limit(1)
.collect()

  Thread.sleep(10)
{code}
we will see that spark started 1002 tasks despite the fact there is limit(1) -

!image-2020-09-01-10-34-47-580.png!  

Expected behavior - both scenarios (limit before and after map) will produce 
the same results - one or two tasks to get one value from the DataFrame.


> Spark ignores limit(1) and starts tasks for all partition
> -
>
> Key: SPARK-32758
> URL: https://issues.apache.org/jira/browse/SPARK-32758
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
> Environment:  
>  
> должен
>  
>Reporter: Ivan Tsukanov
>Priority: Major
> Attachments: image-2020-09-01-10-51-09-417.png
>
>
> If we run the following code
> {code:scala}
>   val sparkConf = new SparkConf()
> .setAppName("test-app")
> .setMaster("local[1]")
>   val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
>   import sparkSession.implicits._
>   val df = (1 to 10)
> .toDF("c1")
> .repartition(1000)
>   implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)
>   df.limit(1)
> .map(identity)
> .collect()
>   df.map(identity)
> .limit(1)
> .collect()
>   Thread.sleep(10)
> {code}
> we will see that spark started 1002 tasks despite the fact there is limit(1) -
> !image-2020-09-01-10-51-09-417.png!
> Expected behavior - both scenarios (limit before and after map) will produce 
> the same results - one or two tasks to get one value from the DataFrame.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32758) Spark ignores limit(1) and starts tasks for all partition

2020-08-31 Thread Ivan Tsukanov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Tsukanov updated SPARK-32758:
--
Environment: (was:  
 
должен
 )

> Spark ignores limit(1) and starts tasks for all partition
> -
>
> Key: SPARK-32758
> URL: https://issues.apache.org/jira/browse/SPARK-32758
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Ivan Tsukanov
>Priority: Major
> Attachments: image-2020-09-01-10-51-09-417.png
>
>
> If we run the following code
> {code:scala}
>   val sparkConf = new SparkConf()
> .setAppName("test-app")
> .setMaster("local[1]")
>   val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
>   import sparkSession.implicits._
>   val df = (1 to 10)
> .toDF("c1")
> .repartition(1000)
>   implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)
>   df.limit(1)
> .map(identity)
> .collect()
>   df.map(identity)
> .limit(1)
> .collect()
>   Thread.sleep(10)
> {code}
> we will see that spark started 1002 tasks despite the fact there is limit(1) -
> !image-2020-09-01-10-51-09-417.png!
> Expected behavior - both scenarios (limit before and after map) will produce 
> the same results - one or two tasks to get one value from the DataFrame.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32758) Spark ignores limit(1) and starts tasks for all partition

2020-08-31 Thread Ivan Tsukanov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Tsukanov updated SPARK-32758:
--
Attachment: image-2020-09-01-10-51-09-417.png

> Spark ignores limit(1) and starts tasks for all partition
> -
>
> Key: SPARK-32758
> URL: https://issues.apache.org/jira/browse/SPARK-32758
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
> Environment:  
>  
> должен
>  
>Reporter: Ivan Tsukanov
>Priority: Major
> Attachments: image-2020-09-01-10-51-09-417.png
>
>
> If we run the following code
> {code:scala}
>   val sparkConf = new SparkConf()
> .setAppName("test-app")
> .setMaster("local[1]")
>   val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
>   import sparkSession.implicits._
>   val df = (1 to 10)
> .toDF("c1")
> .repartition(1000)
>   implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)
>   df.limit(1)
> .map(identity)
> .collect()
>   df.map(identity)
> .limit(1)
> .collect()
>   Thread.sleep(10)
> {code}
> we will see that spark started 1002 tasks despite the fact there is limit(1) -
> !image-2020-09-01-10-34-47-580.png!  
> Expected behavior - both scenarios (limit before and after map) will produce 
> the same results - one or two tasks to get one value from the DataFrame.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32758) Spark ignores limit(1) and starts tasks for all partition

2020-08-31 Thread Ivan Tsukanov (Jira)
Ivan Tsukanov created SPARK-32758:
-

 Summary: Spark ignores limit(1) and starts tasks for all partition
 Key: SPARK-32758
 URL: https://issues.apache.org/jira/browse/SPARK-32758
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0
 Environment:  
 
должен
 
Reporter: Ivan Tsukanov


If we run the following code
{code:scala}
  val sparkConf = new SparkConf()
.setAppName("test-app")
.setMaster("local[1]")
  val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

  import sparkSession.implicits._

  val df = (1 to 10)
.toDF("c1")
.repartition(1000)

  implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)

  df.limit(1)
.map(identity)
.collect()

  df.map(identity)
.limit(1)
.collect()

  Thread.sleep(10)
{code}
we will see that spark started 1002 tasks despite the fact there is limit(1) -

!image-2020-09-01-10-34-47-580.png!  

Expected behavior - both scenarios (limit before and after map) will produce 
the same results - one or two tasks to get one value from the DataFrame.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28742) StackOverflowError when using otherwise(col()) in a loop

2019-08-15 Thread Ivan Tsukanov (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Tsukanov updated SPARK-28742:
--
Description: 
The following code
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

(1 to 9).foldLeft(df) { case (acc, _) =>
  val res = acc.withColumn("c1", column)
  res.take(1)
  res
}
{code}
falls with
{code:java}
java.lang.StackOverflowError
   at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395)
   ...{code}
Probably, the problem is spark generates unexplainable big Physical Plan - 
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

val result = (1 to 9).foldLeft(df) { case (acc, _) =>
  acc.withColumn("c1", column)
}
result.explain()
{code}
it shows a plan 18936 symbols length
{code:java}
== Physical Plan ==
*(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN 
(CASE  18936 symbols
+- Scan ExistingRDD[c1#1]  {code}

  was:
The following code
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

(1 to 9).foldLeft(df) { case (acc, _) =>
  val res = acc.withColumn("c1", column)
  res.take(1)
  res
}
{code}
falls with
{code:java}
java.lang.StackOverflowError
   at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395)
   ...{code}
Probably, the problem is spark generates unexplainable big Physical Plan - 
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

val result = (1 to 9).foldLeft(df) { case (acc, _) =>
  acc.withColumn("c1", column)
}
result.explain()
{code}
it shows a plan 18936 symbols length
{code:java}
== Physical Plan ==
*(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN 
(CASE  
+- Scan ExistingRDD[c1#1]  {code}


> StackOverflowError when using otherwise(col()) in a loop
> 
>
> Key: SPARK-28742
> URL: https://issues.apache.org/jira/browse/SPARK-28742
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0, 2.4.3
>Reporter: Ivan Tsukanov
>Priority: Major
>
> The following code
> {code:java}
> val rdd = sparkContext.makeRDD(Seq(Row("1")))
> val schema = StructType(Seq(
>   StructField("c1", StringType)
> ))
> val df = sparkSession.createDataFrame(rdd, schema)
> val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))
> (1 to 9).foldLeft(df) { case (acc, _) =>
>   val res = acc.withColumn("c1", column)
>   res.take(1)
>   res
> }
> {code}
> falls with
> {code:java}
> java.lang.StackOverflowError
>at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395)
>...{code}
> Probably, the problem is spark generates unexplainable big Physical Plan - 
> {code:java}
> val rdd = sparkContext.makeRDD(Seq(Row("1")))
> val schema = StructType(Seq(
>   StructField("c1", StringType)
> ))
> val df = sparkSession.createDataFrame(rdd, schema)
> val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))
> val result = (1 to 9).foldLeft(df) { case (acc, _) =>
>   acc.withColumn("c1", column)
> }
> result.explain()
> {code}
> it shows a plan 18936 symbols length
> {code:java}
> == Physical Plan ==
> *(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE 
> WHEN (CASE  18936 symbols
> +- Scan ExistingRDD[c1#1]  {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28742) StackOverflowError when using otherwise(col()) in a loop

2019-08-15 Thread Ivan Tsukanov (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Tsukanov updated SPARK-28742:
--
Description: 
The following code
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

(1 to 9).foldLeft(df) { case (acc, _) =>
  val res = acc.withColumn("c1", column)
  res.take(1)
  res
}
{code}
falls with
{code:java}
java.lang.StackOverflowError
   at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395)
   ...{code}
Probably, the problem is spark generates unexplainable big Physical Plan - 
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

val result = (1 to 9).foldLeft(df) { case (acc, _) =>
  acc.withColumn("c1", column)
}
result.explain()
{code}
it shows a plan 18936 symbols length
{code:java}
== Physical Plan ==
*(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN 
(CASE  
+- Scan ExistingRDD[c1#1]  {code}

  was:
The following code
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

(1 to 9).foldLeft(df) { case (acc, _) =>
  val res = acc.withColumn("c1", column)
  res.take(1)
  res
}
{code}
falls with
{code:java}
java.lang.StackOverflowError
   at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395)
   ...{code}
Probably, the problem is spark generates unexplainable big Physical Plan -

 
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

val result = (1 to 9).foldLeft(df) { case (acc, _) =>
  acc.withColumn("c1", column)
}
result.explain()
{code}
it shows a plan 18936 symbols length
{code:java}
== Physical Plan ==
*(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN 
(CASE  
+- Scan ExistingRDD[c1#1] {code}
 


> StackOverflowError when using otherwise(col()) in a loop
> 
>
> Key: SPARK-28742
> URL: https://issues.apache.org/jira/browse/SPARK-28742
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0, 2.4.3
>Reporter: Ivan Tsukanov
>Priority: Major
>
> The following code
> {code:java}
> val rdd = sparkContext.makeRDD(Seq(Row("1")))
> val schema = StructType(Seq(
>   StructField("c1", StringType)
> ))
> val df = sparkSession.createDataFrame(rdd, schema)
> val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))
> (1 to 9).foldLeft(df) { case (acc, _) =>
>   val res = acc.withColumn("c1", column)
>   res.take(1)
>   res
> }
> {code}
> falls with
> {code:java}
> java.lang.StackOverflowError
>at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395)
>...{code}
> Probably, the problem is spark generates unexplainable big Physical Plan - 
> {code:java}
> val rdd = sparkContext.makeRDD(Seq(Row("1")))
> val schema = StructType(Seq(
>   StructField("c1", StringType)
> ))
> val df = sparkSession.createDataFrame(rdd, schema)
> val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))
> val result = (1 to 9).foldLeft(df) { case (acc, _) =>
>   acc.withColumn("c1", column)
> }
> result.explain()
> {code}
> it shows a plan 18936 symbols length
> {code:java}
> == Physical Plan ==
> *(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE 
> WHEN (CASE  
> +- Scan ExistingRDD[c1#1]  {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28742) StackOverflowError when using otherwise(col()) in a loop

2019-08-15 Thread Ivan Tsukanov (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Tsukanov updated SPARK-28742:
--
Description: 
The following code
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

(1 to 9).foldLeft(df) { case (acc, _) =>
  val res = acc.withColumn("c1", column)
  res.take(1)
  res
}
{code}
falls with
{code:java}
java.lang.StackOverflowError
   at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395)
   ...{code}
Probably, the problem is spark generates unexplainable big Physical Plan -

 
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

val result = (1 to 9).foldLeft(df) { case (acc, _) =>
  acc.withColumn("c1", column)
}
result.explain()
{code}
it shows a plan 18936 symbols length
{code:java}
== Physical Plan ==
*(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN 
(CASE  
+- Scan ExistingRDD[c1#1] {code}
 

  was:
The following code
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

(1 to 9).foldLeft(df) { case (acc, _) =>
  val res = acc.withColumn("c1", column)
  res.take(1)
  res
}
{code}
falls with
{code:java}
java.lang.StackOverflowError
   at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395)
   ...{code}
Probably, the problem is spark generates unexplainable big Physical Plan -

 
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

val result = (1 to 9).foldLeft(df) { case (acc, _) =>
  acc.withColumn("c1", column)
}
result.explain()
{code}
it shows a plan 18936 symbols length
{code:java}
== Physical Plan ==
*(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN 
(CASE  
+- Scan ExistingRDD[c1#1]{code}
 

 

 


> StackOverflowError when using otherwise(col()) in a loop
> 
>
> Key: SPARK-28742
> URL: https://issues.apache.org/jira/browse/SPARK-28742
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0, 2.4.3
>Reporter: Ivan Tsukanov
>Priority: Major
>
> The following code
> {code:java}
> val rdd = sparkContext.makeRDD(Seq(Row("1")))
> val schema = StructType(Seq(
>   StructField("c1", StringType)
> ))
> val df = sparkSession.createDataFrame(rdd, schema)
> val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))
> (1 to 9).foldLeft(df) { case (acc, _) =>
>   val res = acc.withColumn("c1", column)
>   res.take(1)
>   res
> }
> {code}
> falls with
> {code:java}
> java.lang.StackOverflowError
>at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395)
>...{code}
> Probably, the problem is spark generates unexplainable big Physical Plan -
>  
> {code:java}
> val rdd = sparkContext.makeRDD(Seq(Row("1")))
> val schema = StructType(Seq(
>   StructField("c1", StringType)
> ))
> val df = sparkSession.createDataFrame(rdd, schema)
> val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))
> val result = (1 to 9).foldLeft(df) { case (acc, _) =>
>   acc.withColumn("c1", column)
> }
> result.explain()
> {code}
> it shows a plan 18936 symbols length
> {code:java}
> == Physical Plan ==
> *(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE 
> WHEN (CASE  
> +- Scan ExistingRDD[c1#1] {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28742) StackOverflowError when using otherwise(col()) in a loop

2019-08-15 Thread Ivan Tsukanov (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Tsukanov updated SPARK-28742:
--
Description: 
The following code
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

(1 to 9).foldLeft(df) { case (acc, _) =>
  val res = acc.withColumn("c1", column)
  res.take(1)
  res
}
{code}
falls with
{code:java}
java.lang.StackOverflowError
   at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395)
   ...{code}
Probably, the problem is spark generates unexplainable big Physical Plan -

 
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

val result = (1 to 9).foldLeft(df) { case (acc, _) =>
  acc.withColumn("c1", column)
}
result.explain()
{code}
it shows a plan 18936 symbols length
{code:java}
== Physical Plan ==
*(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN 
(CASE  
+- Scan ExistingRDD[c1#1]{code}
 

 

 

  was:
The following code

 
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

(1 to 9).foldLeft(df) { case (acc, _) =>
  val res = acc.withColumn("c1", column)
  res.take(1)
  res
}
{code}
falls with

 
{code:java}
java.lang.StackOverflowError
   at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395)
   ...{code}
 

Probably, the problem is spark generates unexplainable big Physical Plan -

!image-2019-08-15-15-10-13-397.png!

 

 


> StackOverflowError when using otherwise(col()) in a loop
> 
>
> Key: SPARK-28742
> URL: https://issues.apache.org/jira/browse/SPARK-28742
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0, 2.4.3
>Reporter: Ivan Tsukanov
>Priority: Major
>
> The following code
> {code:java}
> val rdd = sparkContext.makeRDD(Seq(Row("1")))
> val schema = StructType(Seq(
>   StructField("c1", StringType)
> ))
> val df = sparkSession.createDataFrame(rdd, schema)
> val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))
> (1 to 9).foldLeft(df) { case (acc, _) =>
>   val res = acc.withColumn("c1", column)
>   res.take(1)
>   res
> }
> {code}
> falls with
> {code:java}
> java.lang.StackOverflowError
>at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395)
>...{code}
> Probably, the problem is spark generates unexplainable big Physical Plan -
>  
> {code:java}
> val rdd = sparkContext.makeRDD(Seq(Row("1")))
> val schema = StructType(Seq(
>   StructField("c1", StringType)
> ))
> val df = sparkSession.createDataFrame(rdd, schema)
> val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))
> val result = (1 to 9).foldLeft(df) { case (acc, _) =>
>   acc.withColumn("c1", column)
> }
> result.explain()
> {code}
> it shows a plan 18936 symbols length
> {code:java}
> == Physical Plan ==
> *(1) Project [CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE WHEN (CASE 
> WHEN (CASE  
> +- Scan ExistingRDD[c1#1]{code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28742) StackOverflowError when using otherwise(col()) in a loop

2019-08-15 Thread Ivan Tsukanov (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Tsukanov updated SPARK-28742:
--
Attachment: image-2019-08-15-15-19-33-319.png

> StackOverflowError when using otherwise(col()) in a loop
> 
>
> Key: SPARK-28742
> URL: https://issues.apache.org/jira/browse/SPARK-28742
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0, 2.4.3
>Reporter: Ivan Tsukanov
>Priority: Major
>
> The following code
>  
> {code:java}
> val rdd = sparkContext.makeRDD(Seq(Row("1")))
> val schema = StructType(Seq(
>   StructField("c1", StringType)
> ))
> val df = sparkSession.createDataFrame(rdd, schema)
> val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))
> (1 to 9).foldLeft(df) { case (acc, _) =>
>   val res = acc.withColumn("c1", column)
>   res.take(1)
>   res
> }
> {code}
> falls with
>  
> {code:java}
> java.lang.StackOverflowError
>at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395)
>...{code}
>  
> Probably, the problem is spark generates unexplainable big Physical Plan -
> !image-2019-08-15-15-10-13-397.png!
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28742) StackOverflowError when using otherwise(col()) in a loop

2019-08-15 Thread Ivan Tsukanov (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Tsukanov updated SPARK-28742:
--
Attachment: (was: image-2019-08-15-15-19-33-319.png)

> StackOverflowError when using otherwise(col()) in a loop
> 
>
> Key: SPARK-28742
> URL: https://issues.apache.org/jira/browse/SPARK-28742
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0, 2.4.3
>Reporter: Ivan Tsukanov
>Priority: Major
>
> The following code
>  
> {code:java}
> val rdd = sparkContext.makeRDD(Seq(Row("1")))
> val schema = StructType(Seq(
>   StructField("c1", StringType)
> ))
> val df = sparkSession.createDataFrame(rdd, schema)
> val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))
> (1 to 9).foldLeft(df) { case (acc, _) =>
>   val res = acc.withColumn("c1", column)
>   res.take(1)
>   res
> }
> {code}
> falls with
>  
> {code:java}
> java.lang.StackOverflowError
>at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395)
>...{code}
>  
> Probably, the problem is spark generates unexplainable big Physical Plan -
> !image-2019-08-15-15-10-13-397.png!
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28742) StackOverflowError when using otherwise(col()) in a loop

2019-08-15 Thread Ivan Tsukanov (JIRA)
Ivan Tsukanov created SPARK-28742:
-

 Summary: StackOverflowError when using otherwise(col()) in a loop
 Key: SPARK-28742
 URL: https://issues.apache.org/jira/browse/SPARK-28742
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.3, 2.4.0
Reporter: Ivan Tsukanov


The following code

 
{code:java}
val rdd = sparkContext.makeRDD(Seq(Row("1")))
val schema = StructType(Seq(
  StructField("c1", StringType)
))

val df = sparkSession.createDataFrame(rdd, schema)
val column = when(col("c1").isin("1"), "1").otherwise(col("c1"))

(1 to 9).foldLeft(df) { case (acc, _) =>
  val res = acc.withColumn("c1", column)
  res.take(1)
  res
}
{code}
falls with

 
{code:java}
java.lang.StackOverflowError
   at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:395)
   ...{code}
 

Probably, the problem is spark generates unexplainable big Physical Plan -

!image-2019-08-15-15-10-13-397.png!

 

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28480) Types of input parameters of a UDF affect the ability to cache the result

2019-07-22 Thread Ivan Tsukanov (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16890694#comment-16890694
 ] 

Ivan Tsukanov commented on SPARK-28480:
---

ok, let's close the ticket. [~shivuson...@gmail.com], thanks for the help!

> Types of input parameters of a UDF affect the ability to cache the result
> -
>
> Key: SPARK-28480
> URL: https://issues.apache.org/jira/browse/SPARK-28480
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Ivan Tsukanov
>Priority: Major
> Fix For: 2.4.3
>
> Attachments: image-2019-07-23-10-58-45-768.png
>
>
> When I define a parameter in a UDF as Boolean or Int the result DataFrame 
> can't be cached 
> {code:java}
> import org.apache.spark.sql.functions.{lit, udf}
> val empty = sparkSession.emptyDataFrame
> val table = "table"
> def test(customUDF: UserDefinedFunction, col: Column): Unit = {
>   val df = empty.select(customUDF(col))
>   df.cache()
>   df.createOrReplaceTempView(table)
>   println(sparkSession.catalog.isCached(table))
> }
> test(udf { _: String => 42 }, lit("")) // true
> test(udf { _: Any => 42 }, lit("")) // true
> test(udf { _: Int => 42 }, lit(42)) // false
> test(udf { _: Boolean => 42 }, lit(false)) // false
> {code}
> or sparkSession.catalog.isCached gives irrelevant information.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28480) Types of input parameters of a UDF affect the ability to cache the result

2019-07-22 Thread Ivan Tsukanov (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Tsukanov updated SPARK-28480:
--
Fix Version/s: 2.4.3

> Types of input parameters of a UDF affect the ability to cache the result
> -
>
> Key: SPARK-28480
> URL: https://issues.apache.org/jira/browse/SPARK-28480
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Ivan Tsukanov
>Priority: Major
> Fix For: 2.4.3
>
> Attachments: image-2019-07-23-10-58-45-768.png
>
>
> When I define a parameter in a UDF as Boolean or Int the result DataFrame 
> can't be cached 
> {code:java}
> import org.apache.spark.sql.functions.{lit, udf}
> val empty = sparkSession.emptyDataFrame
> val table = "table"
> def test(customUDF: UserDefinedFunction, col: Column): Unit = {
>   val df = empty.select(customUDF(col))
>   df.cache()
>   df.createOrReplaceTempView(table)
>   println(sparkSession.catalog.isCached(table))
> }
> test(udf { _: String => 42 }, lit("")) // true
> test(udf { _: Any => 42 }, lit("")) // true
> test(udf { _: Int => 42 }, lit(42)) // false
> test(udf { _: Boolean => 42 }, lit(false)) // false
> {code}
> or sparkSession.catalog.isCached gives irrelevant information.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28480) Types of input parameters of a UDF affect the ability to cache the result

2019-07-22 Thread Ivan Tsukanov (JIRA)
Ivan Tsukanov created SPARK-28480:
-

 Summary: Types of input parameters of a UDF affect the ability to 
cache the result
 Key: SPARK-28480
 URL: https://issues.apache.org/jira/browse/SPARK-28480
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.1
Reporter: Ivan Tsukanov


When I define a parameter in a UDF as Boolean or Int the result DataFrame can't 
be cached

 

 
{code:java}
import org.apache.spark.sql.functions.{lit, udf}
val empty = sparkSession.emptyDataFrame
val table = "table"

def test(customUDF: UserDefinedFunction, col: Column): Unit = {
  val df = empty.select(customUDF(col))
  df.cache()
  df.createOrReplaceTempView(table)
  println(sparkSession.catalog.isCached(table))
}

test(udf { _: String => 42 }, lit("")) // true
test(udf { _: Any => 42 }, lit("")) // true
test(udf { _: Int => 42 }, lit(42)) // false
test(udf { _: Boolean => 42 }, lit(false)) // false
{code}
or sparkSession.catalog.isCached gives irrelevant information.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28480) Types of input parameters of a UDF affect the ability to cache the result

2019-07-22 Thread Ivan Tsukanov (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Tsukanov updated SPARK-28480:
--
Description: 
When I define a parameter in a UDF as Boolean or Int the result DataFrame can't 
be cached 
{code:java}
import org.apache.spark.sql.functions.{lit, udf}
val empty = sparkSession.emptyDataFrame
val table = "table"

def test(customUDF: UserDefinedFunction, col: Column): Unit = {
  val df = empty.select(customUDF(col))
  df.cache()
  df.createOrReplaceTempView(table)
  println(sparkSession.catalog.isCached(table))
}

test(udf { _: String => 42 }, lit("")) // true
test(udf { _: Any => 42 }, lit("")) // true
test(udf { _: Int => 42 }, lit(42)) // false
test(udf { _: Boolean => 42 }, lit(false)) // false
{code}
or sparkSession.catalog.isCached gives irrelevant information.

  was:
When I define a parameter in a UDF as Boolean or Int the result DataFrame can't 
be cached

 

 
{code:java}
import org.apache.spark.sql.functions.{lit, udf}
val empty = sparkSession.emptyDataFrame
val table = "table"

def test(customUDF: UserDefinedFunction, col: Column): Unit = {
  val df = empty.select(customUDF(col))
  df.cache()
  df.createOrReplaceTempView(table)
  println(sparkSession.catalog.isCached(table))
}

test(udf { _: String => 42 }, lit("")) // true
test(udf { _: Any => 42 }, lit("")) // true
test(udf { _: Int => 42 }, lit(42)) // false
test(udf { _: Boolean => 42 }, lit(false)) // false
{code}
or sparkSession.catalog.isCached gives irrelevant information.


> Types of input parameters of a UDF affect the ability to cache the result
> -
>
> Key: SPARK-28480
> URL: https://issues.apache.org/jira/browse/SPARK-28480
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Ivan Tsukanov
>Priority: Major
>
> When I define a parameter in a UDF as Boolean or Int the result DataFrame 
> can't be cached 
> {code:java}
> import org.apache.spark.sql.functions.{lit, udf}
> val empty = sparkSession.emptyDataFrame
> val table = "table"
> def test(customUDF: UserDefinedFunction, col: Column): Unit = {
>   val df = empty.select(customUDF(col))
>   df.cache()
>   df.createOrReplaceTempView(table)
>   println(sparkSession.catalog.isCached(table))
> }
> test(udf { _: String => 42 }, lit("")) // true
> test(udf { _: Any => 42 }, lit("")) // true
> test(udf { _: Int => 42 }, lit(42)) // false
> test(udf { _: Boolean => 42 }, lit(false)) // false
> {code}
> or sparkSession.catalog.isCached gives irrelevant information.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25987) StackOverflowError when executing many operations on a table with many columns

2018-11-08 Thread Ivan Tsukanov (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Tsukanov updated SPARK-25987:
--
Description: 
When I execute
{code:java}
val columnsCount = 100
val columns = (1 to columnsCount).map(i => s"col$i")
val initialData = (1 to columnsCount).map(i => s"val$i")

val df = sparkSession.createDataFrame(
  rowRDD = sparkSession.sparkContext.makeRDD(Seq(Row.fromSeq(initialData))),
  schema = StructType(columns.map(StructField(_, StringType, true)))
)

val addSuffixUDF = udf(
  (str: String) => str + "_added"
)

implicit class DFOps(df: DataFrame) {
  def addSuffix() = {
df.select(columns.map(col =>
  addSuffixUDF(df(col)).as(col)
): _*)
  }
}

df
  .addSuffix()
  .addSuffix()
  .addSuffix()
  .show()
{code}
I get
{code:java}
An exception or error caused a run to abort.
java.lang.StackOverflowError
 at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:385)
 at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:553)
...
{code}
If I reduce columns number (to 10 for example) or do `addSuffix` only once - it 
works fine.

 

  was:
When I execute

 
{code:java}
val columnsCount = 100
val columns = (1 to columnsCount).map(i => s"col$i")
val initialData = (1 to columnsCount).map(i => s"val$i")

val df = sparkSession.createDataFrame(
  rowRDD = sparkSession.sparkContext.makeRDD(Seq(Row.fromSeq(initialData))),
  schema = StructType(columns.map(StructField(_, StringType, true)))
)

val addSuffixUDF = udf(
  (str: String) => str + "_added"
)

implicit class DFOps(df: DataFrame) {
  def addSuffix() = {
df.select(columns.map(col =>
  addSuffixUDF(df(col)).as(col)
): _*)
  }
}

df
  .addSuffix()
  .addSuffix()
  .addSuffix()
  .show()
{code}
I get

 

 
{code:java}
An exception or error caused a run to abort.
java.lang.StackOverflowError
 at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:385)
 at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:553)
...
{code}
If I reduce columns number (to 10 for example) or do `addSuffix` only once - it 
works fine.

 


> StackOverflowError when executing many operations on a table with many columns
> --
>
> Key: SPARK-25987
> URL: https://issues.apache.org/jira/browse/SPARK-25987
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.1, 2.2.2, 2.3.0, 2.3.2
> Environment: Ubuntu 18.04.1 LTS, openjdk "1.8.0_181"
>Reporter: Ivan Tsukanov
>Priority: Major
>
> When I execute
> {code:java}
> val columnsCount = 100
> val columns = (1 to columnsCount).map(i => s"col$i")
> val initialData = (1 to columnsCount).map(i => s"val$i")
> val df = sparkSession.createDataFrame(
>   rowRDD = sparkSession.sparkContext.makeRDD(Seq(Row.fromSeq(initialData))),
>   schema = StructType(columns.map(StructField(_, StringType, true)))
> )
> val addSuffixUDF = udf(
>   (str: String) => str + "_added"
> )
> implicit class DFOps(df: DataFrame) {
>   def addSuffix() = {
> df.select(columns.map(col =>
>   addSuffixUDF(df(col)).as(col)
> ): _*)
>   }
> }
> df
>   .addSuffix()
>   .addSuffix()
>   .addSuffix()
>   .show()
> {code}
> I get
> {code:java}
> An exception or error caused a run to abort.
> java.lang.StackOverflowError
>  at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:385)
>  at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:553)
> ...
> {code}
> If I reduce columns number (to 10 for example) or do `addSuffix` only once - 
> it works fine.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25987) StackOverflowError when executing many operations on a table with many columns

2018-11-08 Thread Ivan Tsukanov (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Tsukanov updated SPARK-25987:
--
Description: 
When I execute
{code:java}
val columnsCount = 100
val columns = (1 to columnsCount).map(i => s"col$i")
val initialData = (1 to columnsCount).map(i => s"val$i")

val df = sparkSession.createDataFrame(
  rowRDD = sparkSession.sparkContext.makeRDD(Seq(Row.fromSeq(initialData))),
  schema = StructType(columns.map(StructField(_, StringType, true)))
)

val addSuffixUDF = udf(
  (str: String) => str + "_added"
)

implicit class DFOps(df: DataFrame) {
  def addSuffix() = {
df.select(columns.map(col =>
  addSuffixUDF(df(col)).as(col)
): _*)
  }
}

df
  .addSuffix()
  .addSuffix()
  .addSuffix()
  .show()
{code}
I get
{code:java}
An exception or error caused a run to abort.
java.lang.StackOverflowError
 at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:385)
 at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:553)
...
{code}
If I reduce columns number (to 10 for example) or do `addSuffix` only once - it 
works fine.

  was:
When I execute
{code:java}
val columnsCount = 100
val columns = (1 to columnsCount).map(i => s"col$i")
val initialData = (1 to columnsCount).map(i => s"val$i")

val df = sparkSession.createDataFrame(
  rowRDD = sparkSession.sparkContext.makeRDD(Seq(Row.fromSeq(initialData))),
  schema = StructType(columns.map(StructField(_, StringType, true)))
)

val addSuffixUDF = udf(
  (str: String) => str + "_added"
)

implicit class DFOps(df: DataFrame) {
  def addSuffix() = {
df.select(columns.map(col =>
  addSuffixUDF(df(col)).as(col)
): _*)
  }
}

df
  .addSuffix()
  .addSuffix()
  .addSuffix()
  .show()
{code}
I get
{code:java}
An exception or error caused a run to abort.
java.lang.StackOverflowError
 at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:385)
 at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:553)
...
{code}
If I reduce columns number (to 10 for example) or do `addSuffix` only once - it 
works fine.

 


> StackOverflowError when executing many operations on a table with many columns
> --
>
> Key: SPARK-25987
> URL: https://issues.apache.org/jira/browse/SPARK-25987
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.1, 2.2.2, 2.3.0, 2.3.2
> Environment: Ubuntu 18.04.1 LTS, openjdk "1.8.0_181"
>Reporter: Ivan Tsukanov
>Priority: Major
>
> When I execute
> {code:java}
> val columnsCount = 100
> val columns = (1 to columnsCount).map(i => s"col$i")
> val initialData = (1 to columnsCount).map(i => s"val$i")
> val df = sparkSession.createDataFrame(
>   rowRDD = sparkSession.sparkContext.makeRDD(Seq(Row.fromSeq(initialData))),
>   schema = StructType(columns.map(StructField(_, StringType, true)))
> )
> val addSuffixUDF = udf(
>   (str: String) => str + "_added"
> )
> implicit class DFOps(df: DataFrame) {
>   def addSuffix() = {
> df.select(columns.map(col =>
>   addSuffixUDF(df(col)).as(col)
> ): _*)
>   }
> }
> df
>   .addSuffix()
>   .addSuffix()
>   .addSuffix()
>   .show()
> {code}
> I get
> {code:java}
> An exception or error caused a run to abort.
> java.lang.StackOverflowError
>  at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:385)
>  at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:553)
> ...
> {code}
> If I reduce columns number (to 10 for example) or do `addSuffix` only once - 
> it works fine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25987) StackOverflowError when executing many operations on a table with many columns

2018-11-08 Thread Ivan Tsukanov (JIRA)
Ivan Tsukanov created SPARK-25987:
-

 Summary: StackOverflowError when executing many operations on a 
table with many columns
 Key: SPARK-25987
 URL: https://issues.apache.org/jira/browse/SPARK-25987
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.2, 2.3.0, 2.2.2, 2.2.1
 Environment: Ubuntu 18.04.1 LTS, openjdk "1.8.0_181"
Reporter: Ivan Tsukanov


When I execute

 
{code:java}
val columnsCount = 100
val columns = (1 to columnsCount).map(i => s"col$i")
val initialData = (1 to columnsCount).map(i => s"val$i")

val df = sparkSession.createDataFrame(
  rowRDD = sparkSession.sparkContext.makeRDD(Seq(Row.fromSeq(initialData))),
  schema = StructType(columns.map(StructField(_, StringType, true)))
)

val addSuffixUDF = udf(
  (str: String) => str + "_added"
)

implicit class DFOps(df: DataFrame) {
  def addSuffix() = {
df.select(columns.map(col =>
  addSuffixUDF(df(col)).as(col)
): _*)
  }
}

df
  .addSuffix()
  .addSuffix()
  .addSuffix()
  .show()
{code}
I get

 

 
{code:java}
An exception or error caused a run to abort.
java.lang.StackOverflowError
 at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:385)
 at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:553)
...
{code}
If I reduce columns number (to 10 for example) or do `addSuffix` only once - it 
works fine.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org