[ 
https://issues.apache.org/jira/browse/SPARK-13390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu reassigned SPARK-13390:
------------------------------------

    Assignee: Shixiong Zhu

> Java Spark createDataFrame with List parameter bug
> --------------------------------------------------
>
>                 Key: SPARK-13390
>                 URL: https://issues.apache.org/jira/browse/SPARK-13390
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.0
>         Environment: Java spark, Linux
>            Reporter: mike niemaz
>            Assignee: Shixiong Zhu
>
> I noticed the following bug while testing the dataframe SQL join capabilities.
> Instructions to reproduce it:
> - Read a text file from local file system using JavaSparkContext#texFile 
> method
> - Create a list of related custom objects based on the previously created 
> JavaRDD, using the map function
> -  Create a dataframe using SQLContext createDataFrame(java.util.List, Class) 
> method
>  - Count the dataframe elements using dataframe#count method
> It crashes with the following stacktrace error:
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[count#7L])
> +- TungstenExchange SinglePartition, None
>    +- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#10L])
>       +- LocalTableScan [[empty row],[empty row],[empty row],[empty 
> row],[empty row],[empty row],[empty row],[empty row],[empty row],[empty row]]
>       at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
>       at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
>       at 
> org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
>       at 
> org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
>       at 
> org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)
>       at 
> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537)
>       at 
> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544)
>       at 
> org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1554)
>       at 
> org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1553)
>       at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138)
>       at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1553)
>       at injection.EMATests.joinTest1(EMATests.java:259)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>       at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>       at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
>       at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
> execute, tree:
> TungstenExchange SinglePartition, None
> +- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#10L])
>    +- LocalTableScan [[empty row],[empty row],[empty row],[empty row],[empty 
> row],[empty row],[empty row],[empty row],[empty row],[empty row]]
>       at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
>       at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
>       at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86)
>       at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80)
>       at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
>       ... 46 more
> Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
> execute, tree:
> TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#10L])
> +- LocalTableScan [[empty row],[empty row],[empty row],[empty row],[empty 
> row],[empty row],[empty row],[empty row],[empty row],[empty row]]
>       at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
>       at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
>       at 
> org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:164)
>       at 
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254)
>       at 
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248)
>       at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
>       ... 54 more
> Caused by: org.apache.spark.SparkException: Task not serializable
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
>       at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>       at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
>       at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86)
>       at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80)
>       at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
>       ... 63 more
> Caused by: java.io.NotSerializableException: 
> scala.collection.Iterator$$anon$11
> Serialization stack:
>       - object not serializable (class: scala.collection.Iterator$$anon$11, 
> value: empty iterator)
>       - field (class: scala.collection.Iterator$$anonfun$toStream$1, name: 
> $outer, type: interface scala.collection.Iterator)
>       - object (class scala.collection.Iterator$$anonfun$toStream$1, 
> <function0>)
>       - field (class: scala.collection.immutable.Stream$Cons, name: tl, type: 
> interface scala.Function0)
>       - object (class scala.collection.immutable.Stream$Cons, 
> Stream([TRI1,N,TNW,160000,0006093430000,E,2016-02-01-15.20.31.434000], 
> [TRI2,N,TNW,170000,0006093430000,E,2016-02-01-15.20.31.434000], 
> [TRI3,N,TNW,180000,0006093430000,E,2016-02-01-15.20.31.434000], 
> [TRI4,N,TNW,190000,0006093430000,E,2016-02-01-15.20.31.434000], 
> [CHI1,N,TNY,200000,0006093430000,E,2016-02-01-15.20.31.434000], 
> [CHI2,N,TNY,210000,0006093430000,E,2016-02-01-15.20.31.434000], 
> [CHI3,N,TNY,220000,0006093430000,E,2016-02-01-15.20.31.434000], 
> [CHI4,N,TNY,230000,0006093430000,E,2016-02-01-15.20.31.434000], 
> [CRU1,N,TNY,240000,0006093430000,E,2016-02-01-15.20.31.434000], 
> [CRU2,N,TNY,250000,0006093430000,E,2016-02-01-15.20.31.434000]))
>       - field (class: scala.collection.immutable.Stream$$anonfun$map$1, name: 
> $outer, type: class scala.collection.immutable.Stream)
>       - object (class scala.collection.immutable.Stream$$anonfun$map$1, 
> <function0>)
>       - field (class: scala.collection.immutable.Stream$Cons, name: tl, type: 
> interface scala.Function0)
>       - object (class scala.collection.immutable.Stream$Cons, Stream([empty 
> row], [empty row], [empty row], [empty row], [empty row], [empty row], [empty 
> row], [empty row], [empty row], [empty row]))
>       - field (class: org.apache.spark.sql.execution.LocalTableScan, name: 
> rows, type: interface scala.collection.Seq)
>       - object (class org.apache.spark.sql.execution.LocalTableScan, 
> LocalTableScan [[empty row],[empty row],[empty row],[empty row],[empty 
> row],[empty row],[empty row],[empty row],[empty row],[empty row]]
> )
>       - field (class: 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate, name: child, 
> type: class org.apache.spark.sql.execution.SparkPlan)
>       - object (class 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate, 
> TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#10L])
> +- LocalTableScan [[empty row],[empty row],[empty row],[empty row],[empty 
> row],[empty row],[empty row],[empty row],[empty row],[empty row]]
> )
>       - field (class: 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1,
>  name: $outer, type: class 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate)
>       - object (class 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1,
>  <function0>)
>       - field (class: 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2,
>  name: $outer, type: class 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1)
>       - object (class 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2,
>  <function1>)
>       at 
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>       at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
>       ... 75 more
> A workaround is to use create dataframe directly on JavaRDDs instead of lists



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to