[jira] [Created] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics
Joao created SPARK-17381: Summary: Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics Key: SPARK-17381 URL: https://issues.apache.org/jira/browse/SPARK-17381 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.0.0 Environment: EMR 5.0.0 (submitted as yarn-client) Java Version1.8.0_101 (Oracle Corporation) Scala Version version 2.11.8 Problem also happens when I run locally with similar versions of java/scala. OS: Ubuntu 16.04 Reporter: Joao Priority: Blocker I am running a Spark Streaming application from a Kinesis stream. After some hours running it gets out of memory. After a driver heap dump I found two problems: 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems this was a problem before: https://issues.apache.org/jira/browse/SPARK-11192); To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just needed to run the code below: {code} val dstream = ssc.union(kinesisStreams) dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { //load data val toyDF = streamInfo.map(_ => (1, "data","more data " )) .toDF("Num", "Data", "MoreData" ) toyDF.agg(sum("Num")).first().get(0) } ) {code} 2) huge amount of Array[Byte] (9Gb+) After some analysis, I noticed that most of the Array[Byte] where being referenced by objects that were bring referenced by SQLTaskMetrics. The strangest thing is that those Array[Byte] were basically text that were loaded in the executors so they never should be in the driver at all! Still could not replicate the 2nd problem with a simple code (the original was complex with data coming from S3, DynamoDB and other databases). However, when I debug the application I can see that in Executor.scala, during reportHeartBeat(), I noticed that the data that should not be sent to the driver is being added to "accumUpdates" which, as I understand, will be sent to the driver for reporting. To be more precise, one of the taskRunner in the loop "for (taskRunner <- runningTasks.values().asScala)" contains a GenericInternalRow with a lot of data that should not go to the driver. The path would be in my case taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if not the same) that I see when I do a driver heap dump. I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is fixed I would have less of this undesirable data in the driver and that I could run my streaming app for a long period of time, but I think there will be always some performance lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics
[ https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joao updated SPARK-17381: - Description: I am running a Spark Streaming application from a Kinesis stream. After some hours running it gets out of memory. After a driver heap dump I found two problems: 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems this was a problem before: https://issues.apache.org/jira/browse/SPARK-11192); To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just needed to run the code below: {code} val dstream = ssc.union(kinesisStreams) dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { //load data val toyDF = streamInfo.map(_ => (1, "data","more data " )) .toDF("Num", "Data", "MoreData" ) toyDF.agg(sum("Num")).first().get(0) } ) {code} 2) huge amount of Array[Byte] (9Gb+) After some analysis, I noticed that most of the Array[Byte] where being referenced by objects that were bring referenced by SQLTaskMetrics. The strangest thing is that those Array[Byte] were basically text that were loaded in the executors so they never should be in the driver at all! Still could not replicate the 2nd problem with a simple code (the original was complex with data coming from S3, DynamoDB and other databases). However, when I debug the application I can see that in Executor.scala, during reportHeartBeat(), I noticed that the data that should not be sent to the driver is being added to "accumUpdates" which, as I understand, will be sent to the driver for reporting. To be more precise, one of the taskRunner in the loop "for (taskRunner <- runningTasks.values().asScala)" contains a GenericInternalRow with a lot of data that should not go to the driver. The path would be in my case: taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if not the same) that I see when I do a driver heap dump. I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is fixed I would have less of this undesirable data in the driver and that I could run my streaming app for a long period of time, but I think there will be always some performance lost. was: I am running a Spark Streaming application from a Kinesis stream. After some hours running it gets out of memory. After a driver heap dump I found two problems: 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems this was a problem before: https://issues.apache.org/jira/browse/SPARK-11192); To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just needed to run the code below: {code} val dstream = ssc.union(kinesisStreams) dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { //load data val toyDF = streamInfo.map(_ => (1, "data","more data " )) .toDF("Num", "Data", "MoreData" ) toyDF.agg(sum("Num")).first().get(0) } ) {code} 2) huge amount of Array[Byte] (9Gb+) After some analysis, I noticed that most of the Array[Byte] where being referenced by objects that were bring referenced by SQLTaskMetrics. The strangest thing is that those Array[Byte] were basically text that were loaded in the executors so they never should be in the driver at all! Still could not replicate the 2nd problem with a simple code (the original was complex with data coming from S3, DynamoDB and other databases). However, when I debug the application I can see that in Executor.scala, during reportHeartBeat(), I noticed that the data that should not be sent to the driver is being added to "accumUpdates" which, as I understand, will be sent to the driver for reporting. To be more precise, one of the taskRunner in the loop "for (taskRunner <- runningTasks.values().asScala)" contains a GenericInternalRow with a lot of data that should not go to the driver. The path would be in my case taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if not the same) that I see when I do a driver heap dump. I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is fixed I would have less of this undesirable data in the driver and that I could run my streaming app for a long period of time, but I think there will be always some performance lost. > Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics > - > > Key: SPARK-17381 > URL: https://issues.apache.org/jira/browse/SPARK-17381 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 > Environment: EMR 5.0.0 (submitted as yarn-client) > Java Version 1.8.0_101 (Oracle Corporation) > Scala Version version 2.11.8 > Problem also happens when I run locally with similar versions of java/scala. > OS: Ub
[jira] [Updated] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics
[ https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joao updated SPARK-17381: - Description: I am running a Spark Streaming application from a Kinesis stream. After some hours running it gets out of memory. After a driver heap dump I found two problems: 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems this was a problem before: https://issues.apache.org/jira/browse/SPARK-11192); To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just needed to run the code below: {code} val dstream = ssc.union(kinesisStreams) dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { //load data val toyDF = streamInfo.map(_ => (1, "data","more data " )) .toDF("Num", "Data", "MoreData" ) toyDF.agg(sum("Num")).first().get(0) } ) {code} 2) huge amount of Array[Byte] (9Gb+) After some analysis, I noticed that most of the Array[Byte] where being referenced by objects that were bring referenced by SQLTaskMetrics. The strangest thing is that those Array[Byte] were basically text that were loaded in the executors so they should never be in the driver at all! Still could not replicate the 2nd problem with a simple code (the original was complex with data coming from S3, DynamoDB and other databases). However, when I debug the application I can see that in Executor.scala, during reportHeartBeat(), the data that should not be sent to the driver is being added to "accumUpdates" which, as I understand, will be sent to the driver for reporting. To be more precise, one of the taskRunner in the loop "for (taskRunner <- runningTasks.values().asScala)" contains a GenericInternalRow with a lot of data that should not go to the driver. The path would be in my case: taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if not the same) to the data I see when I do a driver heap dump. I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is fixed I would have less of this undesirable data in the driver and I could run my streaming app for a long period of time, but I think there will be always some performance lost. was: I am running a Spark Streaming application from a Kinesis stream. After some hours running it gets out of memory. After a driver heap dump I found two problems: 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems this was a problem before: https://issues.apache.org/jira/browse/SPARK-11192); To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just needed to run the code below: {code} val dstream = ssc.union(kinesisStreams) dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { //load data val toyDF = streamInfo.map(_ => (1, "data","more data " )) .toDF("Num", "Data", "MoreData" ) toyDF.agg(sum("Num")).first().get(0) } ) {code} 2) huge amount of Array[Byte] (9Gb+) After some analysis, I noticed that most of the Array[Byte] where being referenced by objects that were bring referenced by SQLTaskMetrics. The strangest thing is that those Array[Byte] were basically text that were loaded in the executors so they never should be in the driver at all! Still could not replicate the 2nd problem with a simple code (the original was complex with data coming from S3, DynamoDB and other databases). However, when I debug the application I can see that in Executor.scala, during reportHeartBeat(), I noticed that the data that should not be sent to the driver is being added to "accumUpdates" which, as I understand, will be sent to the driver for reporting. To be more precise, one of the taskRunner in the loop "for (taskRunner <- runningTasks.values().asScala)" contains a GenericInternalRow with a lot of data that should not go to the driver. The path would be in my case: taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if not the same) that I see when I do a driver heap dump. I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is fixed I would have less of this undesirable data in the driver and that I could run my streaming app for a long period of time, but I think there will be always some performance lost. > Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics > - > > Key: SPARK-17381 > URL: https://issues.apache.org/jira/browse/SPARK-17381 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 > Environment: EMR 5.0.0 (submitted as yarn-client) > Java Version 1.8.0_101 (Oracle Corporation) > Scala Version version 2.11.8 > Problem also happens when I run locally with similar versions of java/scala. > OS: Ubuntu 16.04
[jira] [Created] (SPARK-12878) Dataframe fails with nested User Defined Types
Joao created SPARK-12878: Summary: Dataframe fails with nested User Defined Types Key: SPARK-12878 URL: https://issues.apache.org/jira/browse/SPARK-12878 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.0 Reporter: Joao Priority: Blocker Spark 1.6.0 crashes when using nested User Defined Types in a Dataframe. In version 1.5.2 the code below worked just fine: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow import org.apache.spark.sql.types._ @SQLUserDefinedType(udt = classOf[AUDT]) case class A(list:Seq[B]) class AUDT extends UserDefinedType[A] { override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true))) override def userClass: Class[A] = classOf[A] override def serialize(obj: Any): Any = obj match { case A(list) => val row = new GenericMutableRow(1).update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) row } override def deserialize(datum: Any): A = { datum match { case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq) } } } object AUDT extends AUDT @SQLUserDefinedType(udt = classOf[BUDT]) case class B(text:Int) class BUDT extends UserDefinedType[B] { override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false))) override def userClass: Class[B] = classOf[B] override def serialize(obj: Any): Any = obj match { case B(text) => val row = new GenericMutableRow(1).setInt(0, text) row } override def deserialize(datum: Any): B = { datum match { case row: InternalRow => new B(row.getInt(0)) } } } object BUDT extends BUDT object Test { def main(args:Array[String]) = { val col = Seq(new A(Seq(new B(1), new B(2))), new A(Seq(new B(3), new B(4 val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark")) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val df = sc.parallelize(1 to 2 zip col).toDF("id","b") df.select("b").show() df.collect().foreach(println) } } In the new version (1.6.0) I needed to include the following import: import org.apache.spark.sql.catalyst.expressions.GenericMutableRow However, Spark crashes in runtime: 16/01/18 14:36:22 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to org.apache.spark.sql.catalyst.InternalRow at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getStruct(rows.scala:51) at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getStruct(rows.scala:248) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor
[jira] [Updated] (SPARK-12878) Dataframe fails with nested User Defined Types
[ https://issues.apache.org/jira/browse/SPARK-12878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joao updated SPARK-12878: - Description: Spark 1.6.0 crashes when using nested User Defined Types in a Dataframe. In version 1.5.2 the code below worked just fine: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow import org.apache.spark.sql.types._ @SQLUserDefinedType(udt = classOf[AUDT]) case class A(list:Seq[B]) class AUDT extends UserDefinedType[A] { override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true))) override def userClass: Class[A] = classOf[A] override def serialize(obj: Any): Any = obj match { case A(list) => val row = new GenericMutableRow(1) row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) row } override def deserialize(datum: Any): A = { datum match { case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq) } } } object AUDT extends AUDT @SQLUserDefinedType(udt = classOf[BUDT]) case class B(text:Int) class BUDT extends UserDefinedType[B] { override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false))) override def userClass: Class[B] = classOf[B] override def serialize(obj: Any): Any = obj match { case B(text) => val row = new GenericMutableRow(1) row.setInt(0, text) row } override def deserialize(datum: Any): B = { datum match { case row: InternalRow => new B(row.getInt(0)) } } } object BUDT extends BUDT object Test { def main(args:Array[String]) = { val col = Seq(new A(Seq(new B(1), new B(2))), new A(Seq(new B(3), new B(4 val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark")) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val df = sc.parallelize(1 to 2 zip col).toDF("id","b") df.select("b").show() df.collect().foreach(println) } } In the new version (1.6.0) I needed to include the following import: import org.apache.spark.sql.catalyst.expressions.GenericMutableRow However, Spark crashes in runtime: 16/01/18 14:36:22 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to org.apache.spark.sql.catalyst.InternalRow at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getStruct(rows.scala:51) at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getStruct(rows.scala:248) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPool
[jira] [Created] (SPARK-10632) Cannot save DataFrame with User Defined Types
Joao created SPARK-10632: Summary: Cannot save DataFrame with User Defined Types Key: SPARK-10632 URL: https://issues.apache.org/jira/browse/SPARK-10632 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.5.0 Reporter: Joao Cannot save DataFrames that contain user-defined types. At first I thought it was a problem with my udt class, then tried the Vector class from mlib and the error was the same. Te code below should reproduce the error. {noformat} val df = sc.parallelize(Seq((1,Vectors.dense(1,1,1)), (2,Vectors.dense(2,2,2.toDF() df.write.format("json").mode(SaveMode.Overwrite).save(path) {noformat} The error log is below {noformat} 15/09/16 09:58:27 ERROR DefaultWriterContainer: Aborting task. scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class org.apache.spark.sql.catalyst.expressions.GenericMutableRow) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:126) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$.apply(JacksonGenerator.scala:133) at org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.writeInternal(JSONRelation.scala:185) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:243) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6' closed. Now beginning upload 15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6' upload complete 15/09/16 09:58:28 ERROR DefaultWriterContainer: Task attempt attempt_201509160958__m_00_0 aborted. 15/09/16 09:58:28 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:251) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class org.apache.spark.sql.catalyst.expressions.GenericMutableRow) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$o
[jira] [Updated] (SPARK-10632) Cannot save DataFrame with User Defined Types
[ https://issues.apache.org/jira/browse/SPARK-10632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joao updated SPARK-10632: - Description: Cannot save DataFrames that contain user-defined types. At first I thought it was a problem with my udt class, then tried the Vector class from mlib and the error was the same. The code below should reproduce the error. {noformat} val df = sc.parallelize(Seq((1,Vectors.dense(1,1,1)), (2,Vectors.dense(2,2,2.toDF() df.write.format("json").mode(SaveMode.Overwrite).save(path) {noformat} The error log is below {noformat} 15/09/16 09:58:27 ERROR DefaultWriterContainer: Aborting task. scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class org.apache.spark.sql.catalyst.expressions.GenericMutableRow) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:126) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$.apply(JacksonGenerator.scala:133) at org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.writeInternal(JSONRelation.scala:185) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:243) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6' closed. Now beginning upload 15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6' upload complete 15/09/16 09:58:28 ERROR DefaultWriterContainer: Task attempt attempt_201509160958__m_00_0 aborted. 15/09/16 09:58:28 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:251) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class org.apache.spark.sql.catalyst.expressions.GenericMutableRow) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103) at org.apache.spark.sql.execution.datasources
[jira] [Updated] (SPARK-10632) Cannot save DataFrame with User Defined Types
[ https://issues.apache.org/jira/browse/SPARK-10632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joao updated SPARK-10632: - Description: Cannot save DataFrames that contain user-defined types. I tried to save a dataframe with instances of the Vector class from mlib and got the error. The code below should reproduce the error. {noformat} val df = sc.parallelize(Seq((1,Vectors.dense(1,1,1)), (2,Vectors.dense(2,2,2.toDF() df.write.format("json").mode(SaveMode.Overwrite).save(path) {noformat} The error log is below {noformat} 15/09/16 09:58:27 ERROR DefaultWriterContainer: Aborting task. scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class org.apache.spark.sql.catalyst.expressions.GenericMutableRow) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:126) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$.apply(JacksonGenerator.scala:133) at org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.writeInternal(JSONRelation.scala:185) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:243) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6' closed. Now beginning upload 15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6' upload complete 15/09/16 09:58:28 ERROR DefaultWriterContainer: Task attempt attempt_201509160958__m_00_0 aborted. 15/09/16 09:58:28 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:251) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class org.apache.spark.sql.catalyst.expressions.GenericMutableRow) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfu
[jira] [Created] (SPARK-10637) DataFrames: saving with nested User Data Types
Joao created SPARK-10637: Summary: DataFrames: saving with nested User Data Types Key: SPARK-10637 URL: https://issues.apache.org/jira/browse/SPARK-10637 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.5.0 Reporter: Joao Cannot save data frames using nested UserDefinedType I wrote a simple example to show the error. It causes the following error java.lang.IllegalArgumentException: Nested type should be repeated: required group array { required int32 num; } {code:java} import org.apache.spark.sql.SaveMode import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow import org.apache.spark.sql.types._ @SQLUserDefinedType(udt = classOf[AUDT]) case class A(list:Seq[B]) class AUDT extends UserDefinedType[A] { override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true))) override def userClass: Class[A] = classOf[A] override def serialize(obj: Any): Any = obj match { case A(list) => val row = new GenericMutableRow(1) row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) row } override def deserialize(datum: Any): A = { datum match { case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq) } } } object AUDT extends AUDT @SQLUserDefinedType(udt = classOf[BUDT]) case class B(num:Int) class BUDT extends UserDefinedType[B] { override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false))) override def userClass: Class[B] = classOf[B] override def serialize(obj: Any): Any = obj match { case B(num) => val row = new GenericMutableRow(1) row.setInt(0, num) row } override def deserialize(datum: Any): B = { datum match { case row: InternalRow => new B(row.getInt(0)) } } } object BUDT extends BUDT object TestNested { def main(args:Array[String]) = { val col = Seq(new A(Seq(new B(1), new B(2))), new A(Seq(new B(3), new B(4 val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark")) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val df = sc.parallelize(1 to 2 zip col).toDF() df.show() df.write.mode(SaveMode.Overwrite).save(...) } } {code} The error log is shown below: {noformat} 15/09/16 16:44:36 WARN : Your hostname, X resolves to a loopback/non-reachable address: fe80:0:0:0:c4c7:8c4b:4a24:f8a1%14, but we couldn't find any external IP address! 15/09/16 16:44:38 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 15/09/16 16:44:38 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 15/09/16 16:44:38 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 15/09/16 16:44:38 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 15/09/16 16:44:38 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 15/09/16 16:44:38 INFO ParquetRelation: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter 15/09/16 16:44:38 INFO DefaultWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 15/09/16 16:44:38 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:50986 in memory (size: 1402.0 B, free: 973.6 MB) 15/09/16 16:44:38 INFO ContextCleaner: Cleaned accumulator 1 15/09/16 16:44:39 INFO SparkContext: Starting job: save at TestNested.scala:73 15/09/16 16:44:39 INFO DAGScheduler: Got job 1 (save at TestNested.scala:73) with 1 output partitions 15/09/16 16:44:39 INFO DAGScheduler: Final stage: ResultStage 1(save at TestNested.scala:73) 15/09/16 16:44:39 INFO DAGScheduler: Parents of final stage: List() 15/09/16 16:44:39 INFO DAGScheduler: Missing parents: List() 15/09/16 16:44:39 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[1] at rddToDataFrameHolder at TestNested.scala:69), which has no missing parents 15/09/16 16:44:39 INFO MemoryStore: ensureFreeSpace(59832) called with curMem=0, maxMem=1020914565 15/09/16 16:44:39 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 58.4 KB, free 973.6 MB) 15/09/16 16:44:39 INFO MemoryStore: ensureFreeSpace(20794) called with curMem=59832, maxMem=1020914565 15/09/16 16:44:39 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 20.3 KB, free 973.5 MB) 15/09/16 16:44:39 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:50986 (size: 20.3 KB, free: 973.6 MB) 15/09/16 16:44:39 INFO SparkContext: Created broadcast 1 from broadcast at
[jira] [Updated] (SPARK-10637) DataFrames: saving with nested User Data Types
[ https://issues.apache.org/jira/browse/SPARK-10637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joao updated SPARK-10637: - Description: Cannot save data frames using nested UserDefinedType I wrote a simple example to show the error. It causes the following error java.lang.IllegalArgumentException: Nested type should be repeated: required group array { required int32 num; } {code:java} import org.apache.spark.sql.SaveMode import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow import org.apache.spark.sql.types._ @SQLUserDefinedType(udt = classOf[AUDT]) case class A(list:Seq[B]) class AUDT extends UserDefinedType[A] { override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true))) override def userClass: Class[A] = classOf[A] override def serialize(obj: Any): Any = obj match { case A(list) => val row = new GenericMutableRow(1) row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) row } override def deserialize(datum: Any): A = { datum match { case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq) } } } object AUDT extends AUDT @SQLUserDefinedType(udt = classOf[BUDT]) case class B(num:Int) class BUDT extends UserDefinedType[B] { override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false))) override def userClass: Class[B] = classOf[B] override def serialize(obj: Any): Any = obj match { case B(num) => val row = new GenericMutableRow(1) row.setInt(0, num) row } override def deserialize(datum: Any): B = { datum match { case row: InternalRow => new B(row.getInt(0)) } } } object BUDT extends BUDT object TestNested { def main(args:Array[String]) = { val col = Seq(new A(Seq(new B(1), new B(2))), new A(Seq(new B(3), new B(4 val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark")) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val df = sc.parallelize(1 to 2 zip col).toDF() df.show() df.write.mode(SaveMode.Overwrite).save(...) } } {code} The error log is shown below: {noformat} 15/09/16 16:44:36 WARN : Your hostname, X resolves to a loopback/non-reachable address: fe80:0:0:0:c4c7:8c4b:4a24:f8a1%14, but we couldn't find any external IP address! 15/09/16 16:44:38 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 15/09/16 16:44:38 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 15/09/16 16:44:38 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 15/09/16 16:44:38 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 15/09/16 16:44:38 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 15/09/16 16:44:38 INFO ParquetRelation: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter 15/09/16 16:44:38 INFO DefaultWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 15/09/16 16:44:38 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:50986 in memory (size: 1402.0 B, free: 973.6 MB) 15/09/16 16:44:38 INFO ContextCleaner: Cleaned accumulator 1 15/09/16 16:44:39 INFO SparkContext: Starting job: save at TestNested.scala:73 15/09/16 16:44:39 INFO DAGScheduler: Got job 1 (save at TestNested.scala:73) with 1 output partitions 15/09/16 16:44:39 INFO DAGScheduler: Final stage: ResultStage 1(save at TestNested.scala:73) 15/09/16 16:44:39 INFO DAGScheduler: Parents of final stage: List() 15/09/16 16:44:39 INFO DAGScheduler: Missing parents: List() 15/09/16 16:44:39 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[1] at rddToDataFrameHolder at TestNested.scala:69), which has no missing parents 15/09/16 16:44:39 INFO MemoryStore: ensureFreeSpace(59832) called with curMem=0, maxMem=1020914565 15/09/16 16:44:39 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 58.4 KB, free 973.6 MB) 15/09/16 16:44:39 INFO MemoryStore: ensureFreeSpace(20794) called with curMem=59832, maxMem=1020914565 15/09/16 16:44:39 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 20.3 KB, free 973.5 MB) 15/09/16 16:44:39 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:50986 (size: 20.3 KB, free: 973.6 MB) 15/09/16 16:44:39 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861 15/09/16 16:44:39 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[1] at rddToDataFrameHolder at TestNested.sc
[jira] [Updated] (SPARK-10632) Cannot save DataFrame with User Defined Types
[ https://issues.apache.org/jira/browse/SPARK-10632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joao updated SPARK-10632: - Description: Cannot save DataFrames that contain user-defined types. I tried to save a dataframe with instances of the Vector class from mlib and got the error. The code below should reproduce the error. {noformat} val df = sc.parallelize(Seq((1,Vectors.dense(1,1,1)), (2,Vectors.dense(2,2,2.toDF() df.write.format("json").mode(SaveMode.Overwrite).save(path) {noformat} The error log is below {noformat} 15/09/16 09:58:27 ERROR DefaultWriterContainer: Aborting task. scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class org.apache.spark.sql.catalyst.expressions.GenericMutableRow) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:126) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$.apply(JacksonGenerator.scala:133) at org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.writeInternal(JSONRelation.scala:185) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:243) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6' closed. Now beginning upload 15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6' upload complete 15/09/16 09:58:28 ERROR DefaultWriterContainer: Task attempt attempt_201509160958__m_00_0 aborted. 15/09/16 09:58:28 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:251) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class org.apache.spark.sql.catalyst.expressions.GenericMutableRow) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194) at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103) at org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfu
[jira] [Commented] (SPARK-10632) Cannot save DataFrame with User Defined Types
[ https://issues.apache.org/jira/browse/SPARK-10632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14804314#comment-14804314 ] Joao commented on SPARK-10632: -- Thanks Joseph. I tried with Spark 1.5.0 in Windows 7, standalone mode. Do you think it may be a platform issue? I will be out from office until Tuesday, but I will try it on Linux. > Cannot save DataFrame with User Defined Types > - > > Key: SPARK-10632 > URL: https://issues.apache.org/jira/browse/SPARK-10632 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 >Reporter: Joao > > Cannot save DataFrames that contain user-defined types. > I tried to save a dataframe with instances of the Vector class from mlib and > got the error. > The code below should reproduce the error. > {noformat} > val df = sc.parallelize(Seq((1,Vectors.dense(1,1,1)), > (2,Vectors.dense(2,2,2.toDF() > df.write.format("json").mode(SaveMode.Overwrite).save(path) > {noformat} > The error log is below > {noformat} > 15/09/16 09:58:27 ERROR DefaultWriterContainer: Aborting task. > scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class > org.apache.spark.sql.catalyst.expressions.GenericMutableRow) > at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194) > at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179) > at > org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103) > at > org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89) > at > org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:126) > at > org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89) > at > org.apache.spark.sql.execution.datasources.json.JacksonGenerator$.apply(JacksonGenerator.scala:133) > at > org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.writeInternal(JSONRelation.scala:185) > at > org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:243) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > 15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key > 'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6' > closed. Now beginning upload > 15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key > 'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6' > upload complete > 15/09/16 09:58:28 ERROR DefaultWriterContainer: Task attempt > attempt_201509160958__m_00_0 aborted. > 15/09/16 09:58:28 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) > org.apache.spark.SparkException: Task failed while writing rows. > at > org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:251) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(Th