Even i am not able to reproduce error On Thu, Feb 22, 2018 at 2:51 AM, Michael Artz <michaelea...@gmail.com> wrote:
> I am not able to reproduce your error. You should do something before you > do that last function and maybe get some more help from the exception it > returns. Like just add a csv.show (1) on the line before. Also, can you > post the different exception when you took out the "return" value like when > Bryan suggested? > > It's getting to this bit of code > > private[spark] class ReturnStatementInClosureException > extends SparkException("Return statements aren't allowed in Spark closures") > > private class ReturnStatementFinder extends ClassVisitor(ASM5) { > override def visitMethod(access: Int, name: String, desc: String, > sig: String, exceptions: Array[String]): MethodVisitor = { > if (name.contains("apply")) { > new MethodVisitor(ASM5) { > override def visitTypeInsn(op: Int, tp: String) { > if (op == NEW && > tp.contains("scala/runtime/NonLocalReturnControl")) { > throw new ReturnStatementInClosureException > } > } > } > } else { > new MethodVisitor(ASM5) {} > } > } > } > > and it must see the NonLocalReturnControl exception. My first guess is > that the "queryYahoo" function is doing something that is causing an > exception, but for some reason (Networking thing maybe?) it works ok in > spark-shell. > > On Feb 21, 2018 10:47 PM, "Lian Jiang" <jiangok2...@gmail.com> wrote: > >> Sorry Bryan. Unfortunately, this is not the root cause. >> >> Any other ideas? This is blocking my scenario. Thanks. >> >> On Wed, Feb 21, 2018 at 4:26 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> >> wrote: >> >>> Lian, >>> >>> You're writing Scala. Just remove the 'return'. No need for it in Scala. >>> >>> Get Outlook for Android <https://aka.ms/ghei36> >>> >>> ------------------------------ >>> *From:* Lian Jiang <jiangok2...@gmail.com> >>> *Sent:* Wednesday, February 21, 2018 4:16:08 PM >>> *To:* user >>> *Subject:* Return statements aren't allowed in Spark closures >>> >>> I can run below code in spark-shell using yarn client mode. >>> >>> val csv = spark.read.option("header", "true").csv("my.csv") >>> >>> def queryYahoo(row: Row) : Int = { return 10; } >>> >>> csv.repartition(5).rdd.foreachPartition{ p => p.foreach(r => { >>> queryYahoo(r) })} >>> >>> However, the same code failed when run using spark-submit in yarn client >>> or cluster mode due to error: >>> >>> 18/02/21 21:00:12 ERROR ApplicationMaster: User class threw exception: >>> org.apache.spark.util.ReturnStatementInClosureException: Return >>> statements aren't allowed in Spark closures >>> >>> org.apache.spark.util.ReturnStatementInClosureException: Return >>> statements aren't allowed in Spark closures >>> >>> at org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTyp >>> eInsn(ClosureCleaner.scala:371) >>> >>> at org.apache.xbean.asm5.ClassReader.a(Unknown Source) >>> >>> at org.apache.xbean.asm5.ClassReader.b(Unknown Source) >>> >>> at org.apache.xbean.asm5.ClassReader.accept(Unknown Source) >>> >>> at org.apache.xbean.asm5.ClassReader.accept(Unknown Source) >>> >>> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ >>> ClosureCleaner$$clean(ClosureCleaner.scala:243) >>> >>> at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spa >>> rk$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:306) >>> >>> at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spa >>> rk$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:292) >>> >>> at scala.collection.TraversableLike$WithFilter$$anonfun$foreach >>> $1.apply(TraversableLike.scala:733) >>> >>> at scala.collection.immutable.List.foreach(List.scala:381) >>> >>> at scala.collection.TraversableLike$WithFilter.foreach(Traversa >>> bleLike.scala:732) >>> >>> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ >>> ClosureCleaner$$clean(ClosureCleaner.scala:292) >>> >>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156) >>> >>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2294) >>> >>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(R >>> DD.scala:925) >>> >>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(R >>> DD.scala:924) >>> >>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati >>> onScope.scala:151) >>> >>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati >>> onScope.scala:112) >>> >>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) >>> >>> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924) >>> >>> >>> Any idea? Thanks. >>> >> >>