Stephen Jenkins created LIVY-720:
------------------------------------

             Summary: NoSuchElementException caused when reading from hdfs  
submitted via livy programmatic api
                 Key: LIVY-720
                 URL: https://issues.apache.org/jira/browse/LIVY-720
             Project: Livy
          Issue Type: Bug
          Components: RSC
    Affects Versions: 0.6.0
         Environment: Using a docker container on windows 10: 
https://hub.docker.com/r/cheathwood/hadoop-spark-livy

            Reporter: Stephen Jenkins


Hi,

 

I've been using the Livy programmatic api to submit spark jobs written in scala 
and I've ran into a strange issue. I'm using case classes to wrap the 
parameters I want to send over to spark, then within the job I manipulate them 
to be used for different parts of the job. However, it seems whenever I try 
read and collect data from hdfs I get the following error:


{code:java}
java.util.NoSuchElementException: head of empty list
        at scala.collection.immutable.Nil$.head(List.scala:420)
        at scala.collection.immutable.Nil$.head(List.scala:417)
        at scala.collection.immutable.List.map(List.scala:277)
        at 
scala.reflect.internal.Symbols$Symbol.parentSymbols(Symbols.scala:2117)
        at 
scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:301)
        at 
scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:341)
        at 
scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply$mcV$sp(SymbolLoaders.scala:74)
        at 
scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply(SymbolLoaders.scala:71)
        at 
scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply(SymbolLoaders.scala:71)
        at 
scala.reflect.internal.SymbolTable.slowButSafeEnteringPhaseNotLaterThan(SymbolTable.scala:263)
        at 
scala.reflect.runtime.SymbolLoaders$LazyPackageType.complete(SymbolLoaders.scala:71)
        at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514)
        at 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:174)
        at 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
        at 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
        at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
        at 
scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
        at 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
        at 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:174)
        at 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127)
        at 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1.info(SynchronizedSymbols.scala:174)
        at scala.reflect.internal.Types$TypeRef.thisInfo(Types.scala:2194)
        at scala.reflect.internal.Types$TypeRef.baseClasses(Types.scala:2199)
        at 
scala.reflect.internal.tpe.FindMembers$FindMemberBase.<init>(FindMembers.scala:17)
        at 
scala.reflect.internal.tpe.FindMembers$FindMember.<init>(FindMembers.scala:219)
        at 
scala.reflect.internal.Types$Type.scala$reflect$internal$Types$Type$$findMemberInternal$1(Types.scala:1014)
        at scala.reflect.internal.Types$Type.findMember(Types.scala:1016)
        at scala.reflect.internal.Types$Type.memberBasedOnName(Types.scala:631)
        at scala.reflect.internal.Types$Type.member(Types.scala:600)
        at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
        at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:66)
        at 
scala.reflect.internal.Mirrors$RootsBase.staticPackage(Mirrors.scala:204)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror.staticPackage(JavaMirrors.scala:82)
        at scala.reflect.internal.Mirrors$RootsBase.init(Mirrors.scala:263)
        at 
scala.reflect.runtime.JavaMirrors$class.scala$reflect$runtime$JavaMirrors$$createMirror(JavaMirrors.scala:32)
        at 
scala.reflect.runtime.JavaMirrors$$anonfun$runtimeMirror$1.apply(JavaMirrors.scala:49)
        at 
scala.reflect.runtime.JavaMirrors$$anonfun$runtimeMirror$1.apply(JavaMirrors.scala:47)
        at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
        at 
scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
        at 
scala.reflect.runtime.JavaMirrors$class.runtimeMirror(JavaMirrors.scala:46)
        at 
scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:16)
        at 
scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:16)
        at 
org.apache.spark.sql.catalyst.ScalaReflection$.mirror(ScalaReflection.scala:45)
        at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:48)
        at org.apache.spark.sql.Encoders$.STRING(Encoders.scala:96)
        at 
org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.createBaseDataset(CSVDataSource.scala:189)
        at 
org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:147)
        at 
org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:63)
        at 
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:57)
        at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:202)
        at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:202)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:201)
        at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:392)
        at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
        at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:594)
        at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:473)
        at test.FailingJob.theStuffToDo(FailingJob.scala:28)
        at test.FailingJob.call(FailingJob.scala:11)
        at 
test.Service$$anonfun$testProvider$2$$anonfun$apply$1.apply(Service.scala:22)
        at 
test.Service$$anonfun$testProvider$2$$anonfun$apply$1.apply(Service.scala:22)
        at 
org.apache.livy.scalaapi.LivyScalaClient$$anon$1.call(LivyScalaClient.scala:54)
        at org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:40)
        at org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:27)
        at org.apache.livy.rsc.driver.JobWrapper.call(JobWrapper.java:64)
        at 
org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:45)
        at 
org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:27)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}
  

As you can see, it's a Scala Reflection error and seems to fail when trying to 
get class definitions, which makes me think that it has something to do with 
the kryo serialization/deserialization. I've managed to isolate the issue and 
reproduce it in a small 
[example|[https://github.com/steviej08/FailingLivySparkJob]]

It doesn't matter what I do with the array within the function it seems, but it 
must be used. As soon the job reads the csv it fails. The key thing is 
`someSeq` is of type `Seq[String]`. It seems to work when I pass it in as an 
`Array[String]`. If I do not reference the param, but still pass the param in, 
it does not fail.


{code:java}
val mappedArray = someSeq.map(s => s.toUpperCase())

val ss: SparkSession = scalaJobContext.sparkSession

val csv: DataFrame = ss
  .read
  .option("header", "true")
  .option("mode", "DROPMALFORMED")
  .csv(filePath)

println(csv.count())
{code}

I appreciate that the exception is being thrown within spark, but I can't seem 
to reproduce it without using the Rpc Livy programmatic api. 

Other things to note. I am using spark version `2.3`, however, I have tried to 
upgrade spark to `v2.4` with no luck. I have also tried to generate a function 
from an object, without using a case class with no luck. I have tried to 
reference the value as a property on an object, which did work. This is no good 
for me though, as I need (well would much prefer) to pass in a parameter.

I know that I can just use an Array, but it means I'd need to transform all my 
objects for transmission which is a bit of a pain. Also, I am struggling to do 
this transformation in production with it working within my small example, but 
am working on this currently.

 

Any help or insight on this would be amazing.

 

Thanks,

Stephen

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to