[ 
https://issues.apache.org/jira/browse/SPARK-27150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Sean updated SPARK-27150:
------------------------------
    Description: 
I run this (reduced) following code multiples times under the same exact input 
files : 
{code:java}
def myUdf(input : java.lang.String) : Option[Long] = {
  None
}

...

val sparkMain = ... .getOrCreate()
val d = inputPaths.toList.par
val p = new scala.concurrent.forkjoin.ForkJoinPool(12)

try {

   d.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(p)
   d.foreach {
    case (inputPath) => {
      val spark = sparkMain.newSession()
      
      spark.udf.register("myUdf",udf(myUdf _)) 

      val df = spark.read.format("csv").option("inferSchema", 
"false").option("mode", "DROPMALFORMED").schema(mySchema).load(inputPath) 

      df.createOrReplaceTempView("mytable")

      val sql = spark.sql(""" SELECT CAST( myUdf(updated_date) as long) FROM 
mytable """)

      sql.write.parquet( ... ) 
   }
 }
} finally {
  p.shutdown()
}{code}
Once in ten (spark-submit the application), the driver failed with an Exception 
related to Spark SQL and the UDF. However, as you can see, I have reduced the 
UDF to minimum, it now returns None everytime, and the problem still occurs. 
So, I think the problem is more likely related to having the driver submitting 
multiples jobs in parallel, aka "scheduling within apps".

The exception is as follow :
{code:java}
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.spark.sql.AnalysisException: cannot resolve 
'CAST(UDF(updated_date) AS BIGINT)' due to data type mismatch: cannot cast 
struct<> to bigint; line 5 pos 10;
...
at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
...
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at 
scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}
So basically, with the exact same inputs, nine of ten times, everything 
succeed. But approximately one of ten, the previous exception occurs, so it's 
very strange, and tend to prove that there are some side effect inside Spark 
core code when using scheduling within app  ...

Thanks for investigating

*EDIT :*

When I do :
{code:java}
val sql = spark.sql(""" SELECT myUdf(updated_date) FROM mytable """)
{code}
Instead of :
{code:java}
val sql = spark.sql(""" SELECT CAST( myUdf(updated_date) as long) FROM mytable 
""")
{code}
I'm no longer ensuring that there is either a long or null returned by the UDF. 
So it fails at the parquet level :
{code:java}
Caused by: org.apache.parquet.schema.InvalidSchemaException: Cannot write a 
schema with an empty group: optional group updated_date {
}
{code}
So it seems that in such case (when using scheduling within app) the UDF is 
returning randomly a struct/group instead of null/None ... 

  was:
I run this (reduced) following code multiples times under the same exact input 
files : 
{code:java}
def myUdf(input : java.lang.String) : Option[Long] = {
  None
}

...

val sparkMain = ... .getOrCreate()
val d = inputPaths.toList.par
val p = new scala.concurrent.forkjoin.ForkJoinPool(12)

try {

   d.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(p)
   d.foreach {
    case (inputPath) => {
      val spark = sparkMain.newSession()
      
      spark.udf.register("myUdf",udf(myUdf _)) 

      val df = spark.read.format("csv").option("inferSchema", 
"false").option("mode", "DROPMALFORMED").schema(mySchema).load(inputPath) 

      df.createOrReplaceTempView("mytable")

      val sql = spark.sql(""" SELECT CAST( myUdf(updated_date) as long) FROM 
mytable """)

      sql.write.parquet( ... ) 
   }
 }
} finally {
  p.shutdown()
}{code}
Once in ten (spark-submit the application), the driver failed with an Exception 
related to Spark SQL and the UDF. However, as you can see, I have reduced the 
UDF to minimum, it now returns None everytime, and the problem still occurs. 
So, I think the problem is more likely related to having the driver submitting 
multiples jobs in parallel, aka "scheduling within apps".

The exception is as follow :
{code:java}
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.spark.sql.AnalysisException: cannot resolve 
'CAST(UDF(updated_date) AS BIGINT)' due to data type mismatch: cannot cast 
struct<> to bigint; line 5 pos 10;
...
at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
...
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at 
scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}
So basically, with the exact same inputs, nine of ten times, everything 
succeed. But approximately one of ten, the previous exception occurs, so it's 
very strange, and tend to prove that there are some side effect inside Spark 
core code when using scheduling within app  ...

Thanks for investigating

*EDIT :*

When I do :
{code:java}
val sql = spark.sql(""" SELECT myUdf(updated_date) FROM mytable """)
{code}
Instead of :
{code:java}
val sql = spark.sql(""" SELECT CAST( myUdf(updated_date) as long) FROM mytable 
""")
{code}
I'm no longer ensuring that there is either a long or null returned by the UDF. 
So it fails at the parquet level :

 

 

 

 

 


> Scheduling Within an Application : Spark SQL randomly failed on UDF
> -------------------------------------------------------------------
>
>                 Key: SPARK-27150
>                 URL: https://issues.apache.org/jira/browse/SPARK-27150
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 2.3.1, 2.3.2, 2.3.3, 2.4.0
>            Reporter: Josh Sean
>            Priority: Major
>
> I run this (reduced) following code multiples times under the same exact 
> input files : 
> {code:java}
> def myUdf(input : java.lang.String) : Option[Long] = {
>   None
> }
> ...
> val sparkMain = ... .getOrCreate()
> val d = inputPaths.toList.par
> val p = new scala.concurrent.forkjoin.ForkJoinPool(12)
> try {
>    d.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(p)
>    d.foreach {
>     case (inputPath) => {
>       val spark = sparkMain.newSession()
>       
>       spark.udf.register("myUdf",udf(myUdf _)) 
>       val df = spark.read.format("csv").option("inferSchema", 
> "false").option("mode", "DROPMALFORMED").schema(mySchema).load(inputPath) 
>       df.createOrReplaceTempView("mytable")
>       val sql = spark.sql(""" SELECT CAST( myUdf(updated_date) as long) FROM 
> mytable """)
>       sql.write.parquet( ... ) 
>    }
>  }
> } finally {
>   p.shutdown()
> }{code}
> Once in ten (spark-submit the application), the driver failed with an 
> Exception related to Spark SQL and the UDF. However, as you can see, I have 
> reduced the UDF to minimum, it now returns None everytime, and the problem 
> still occurs. So, I think the problem is more likely related to having the 
> driver submitting multiples jobs in parallel, aka "scheduling within apps".
> The exception is as follow :
> {code:java}
> Exception in thread "main" java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
> at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
> Caused by: org.apache.spark.sql.AnalysisException: cannot resolve 
> 'CAST(UDF(updated_date) AS BIGINT)' due to data type mismatch: cannot cast 
> struct<> to bigint; line 5 pos 10;
> ...
> at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
> at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
> at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
> at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
> at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
> at 
> org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
> at 
> org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
> at 
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
> ...
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at 
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
> at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> at 
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
> at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
> at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
> at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
> at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> So basically, with the exact same inputs, nine of ten times, everything 
> succeed. But approximately one of ten, the previous exception occurs, so it's 
> very strange, and tend to prove that there are some side effect inside Spark 
> core code when using scheduling within app  ...
> Thanks for investigating
> *EDIT :*
> When I do :
> {code:java}
> val sql = spark.sql(""" SELECT myUdf(updated_date) FROM mytable """)
> {code}
> Instead of :
> {code:java}
> val sql = spark.sql(""" SELECT CAST( myUdf(updated_date) as long) FROM 
> mytable """)
> {code}
> I'm no longer ensuring that there is either a long or null returned by the 
> UDF. So it fails at the parquet level :
> {code:java}
> Caused by: org.apache.parquet.schema.InvalidSchemaException: Cannot write a 
> schema with an empty group: optional group updated_date {
> }
> {code}
> So it seems that in such case (when using scheduling within app) the UDF is 
> returning randomly a struct/group instead of null/None ... 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to