[jira] [Created] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics

2016-09-02 Thread Joao (JIRA)
Joao created SPARK-17381:


 Summary: Memory leak  
org.apache.spark.sql.execution.ui.SQLTaskMetrics
 Key: SPARK-17381
 URL: https://issues.apache.org/jira/browse/SPARK-17381
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.0
 Environment: EMR 5.0.0 (submitted as yarn-client)
Java Version1.8.0_101 (Oracle Corporation)
Scala Version   version 2.11.8

Problem also happens when I run locally with similar versions of java/scala. 
OS: Ubuntu 16.04
Reporter: Joao
Priority: Blocker


I am running a Spark Streaming application from a Kinesis stream. After some 
hours running it gets out of memory. After a driver heap dump I found two 
problems:
1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems 
this was a problem before: 
https://issues.apache.org/jira/browse/SPARK-11192);

To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just 
needed to run the code below:

{code}
val dstream = ssc.union(kinesisStreams)
dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
  //load data
  val toyDF = streamInfo.map(_ =>
(1, "data","more data "
))
.toDF("Num", "Data", "MoreData" )
  toyDF.agg(sum("Num")).first().get(0)
}
)
{code}


2) huge amount of Array[Byte] (9Gb+)

After some analysis, I noticed that most of the Array[Byte] where being 
referenced by objects that were bring referenced by SQLTaskMetrics. The 
strangest thing is that those Array[Byte] were basically text that were loaded 
in the executors so they never should be in the driver at all!

Still could not replicate the 2nd problem with a simple code (the original was 
complex with data coming from S3, DynamoDB and other databases). However, when 
I debug the application I can see that in Executor.scala, during 
reportHeartBeat(), I noticed that the data that should not be sent to the 
driver is being added to "accumUpdates" which, as I understand, will be sent to 
the driver for reporting.

To be more precise, one of the taskRunner in the loop "for (taskRunner <- 
runningTasks.values().asScala)"  contains a GenericInternalRow with a lot of 
data that should not go to the driver. The path would be in my case
taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if 
not the same) that I see when I do a driver heap dump. 

I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is 
fixed I would have less of this undesirable data in the driver and that I could 
run my streaming app for a long period of time, but I think there will be 
always some performance lost.







--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics

2016-09-02 Thread Joao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joao updated SPARK-17381:
-
Description: 
I am running a Spark Streaming application from a Kinesis stream. After some 
hours running it gets out of memory. After a driver heap dump I found two 
problems:
1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems 
this was a problem before: 
https://issues.apache.org/jira/browse/SPARK-11192);

To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just 
needed to run the code below:

{code}
val dstream = ssc.union(kinesisStreams)
dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
  //load data
  val toyDF = streamInfo.map(_ =>
(1, "data","more data "
))
.toDF("Num", "Data", "MoreData" )
  toyDF.agg(sum("Num")).first().get(0)
}
)
{code}


2) huge amount of Array[Byte] (9Gb+)

After some analysis, I noticed that most of the Array[Byte] where being 
referenced by objects that were bring referenced by SQLTaskMetrics. The 
strangest thing is that those Array[Byte] were basically text that were loaded 
in the executors so they never should be in the driver at all!

Still could not replicate the 2nd problem with a simple code (the original was 
complex with data coming from S3, DynamoDB and other databases). However, when 
I debug the application I can see that in Executor.scala, during 
reportHeartBeat(), I noticed that the data that should not be sent to the 
driver is being added to "accumUpdates" which, as I understand, will be sent to 
the driver for reporting.

To be more precise, one of the taskRunner in the loop "for (taskRunner <- 
runningTasks.values().asScala)"  contains a GenericInternalRow with a lot of 
data that should not go to the driver. The path would be in my case: 
taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if 
not the same) that I see when I do a driver heap dump. 

I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is 
fixed I would have less of this undesirable data in the driver and that I could 
run my streaming app for a long period of time, but I think there will be 
always some performance lost.





  was:
I am running a Spark Streaming application from a Kinesis stream. After some 
hours running it gets out of memory. After a driver heap dump I found two 
problems:
1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems 
this was a problem before: 
https://issues.apache.org/jira/browse/SPARK-11192);

To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just 
needed to run the code below:

{code}
val dstream = ssc.union(kinesisStreams)
dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
  //load data
  val toyDF = streamInfo.map(_ =>
(1, "data","more data "
))
.toDF("Num", "Data", "MoreData" )
  toyDF.agg(sum("Num")).first().get(0)
}
)
{code}


2) huge amount of Array[Byte] (9Gb+)

After some analysis, I noticed that most of the Array[Byte] where being 
referenced by objects that were bring referenced by SQLTaskMetrics. The 
strangest thing is that those Array[Byte] were basically text that were loaded 
in the executors so they never should be in the driver at all!

Still could not replicate the 2nd problem with a simple code (the original was 
complex with data coming from S3, DynamoDB and other databases). However, when 
I debug the application I can see that in Executor.scala, during 
reportHeartBeat(), I noticed that the data that should not be sent to the 
driver is being added to "accumUpdates" which, as I understand, will be sent to 
the driver for reporting.

To be more precise, one of the taskRunner in the loop "for (taskRunner <- 
runningTasks.values().asScala)"  contains a GenericInternalRow with a lot of 
data that should not go to the driver. The path would be in my case
taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if 
not the same) that I see when I do a driver heap dump. 

I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is 
fixed I would have less of this undesirable data in the driver and that I could 
run my streaming app for a long period of time, but I think there will be 
always some performance lost.






> Memory leak  org.apache.spark.sql.execution.ui.SQLTaskMetrics
> -
>
> Key: SPARK-17381
> URL: https://issues.apache.org/jira/browse/SPARK-17381
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: EMR 5.0.0 (submitted as yarn-client)
> Java Version  1.8.0_101 (Oracle Corporation)
> Scala Version version 2.11.8
> Problem also happens when I run locally with similar versions of java/scala. 
> OS: Ub

[jira] [Updated] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics

2016-09-02 Thread Joao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joao updated SPARK-17381:
-
Description: 
I am running a Spark Streaming application from a Kinesis stream. After some 
hours running it gets out of memory. After a driver heap dump I found two 
problems:
1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems 
this was a problem before: 
https://issues.apache.org/jira/browse/SPARK-11192);

To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just 
needed to run the code below:

{code}
val dstream = ssc.union(kinesisStreams)
dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
  //load data
  val toyDF = streamInfo.map(_ =>
(1, "data","more data "
))
.toDF("Num", "Data", "MoreData" )
  toyDF.agg(sum("Num")).first().get(0)
}
)
{code}


2) huge amount of Array[Byte] (9Gb+)

After some analysis, I noticed that most of the Array[Byte] where being 
referenced by objects that were bring referenced by SQLTaskMetrics. The 
strangest thing is that those Array[Byte] were basically text that were loaded 
in the executors so they should never be in the driver at all!

Still could not replicate the 2nd problem with a simple code (the original was 
complex with data coming from S3, DynamoDB and other databases). However, when 
I debug the application I can see that in Executor.scala, during 
reportHeartBeat(),  the data that should not be sent to the driver is being 
added to "accumUpdates" which, as I understand, will be sent to the driver for 
reporting.

To be more precise, one of the taskRunner in the loop "for (taskRunner <- 
runningTasks.values().asScala)"  contains a GenericInternalRow with a lot of 
data that should not go to the driver. The path would be in my case: 
taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if 
not the same) to the data I see when I do a driver heap dump. 

I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is 
fixed I would have less of this undesirable data in the driver and I could run 
my streaming app for a long period of time, but I think there will be always 
some performance lost.





  was:
I am running a Spark Streaming application from a Kinesis stream. After some 
hours running it gets out of memory. After a driver heap dump I found two 
problems:
1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems 
this was a problem before: 
https://issues.apache.org/jira/browse/SPARK-11192);

To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just 
needed to run the code below:

{code}
val dstream = ssc.union(kinesisStreams)
dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
  //load data
  val toyDF = streamInfo.map(_ =>
(1, "data","more data "
))
.toDF("Num", "Data", "MoreData" )
  toyDF.agg(sum("Num")).first().get(0)
}
)
{code}


2) huge amount of Array[Byte] (9Gb+)

After some analysis, I noticed that most of the Array[Byte] where being 
referenced by objects that were bring referenced by SQLTaskMetrics. The 
strangest thing is that those Array[Byte] were basically text that were loaded 
in the executors so they never should be in the driver at all!

Still could not replicate the 2nd problem with a simple code (the original was 
complex with data coming from S3, DynamoDB and other databases). However, when 
I debug the application I can see that in Executor.scala, during 
reportHeartBeat(), I noticed that the data that should not be sent to the 
driver is being added to "accumUpdates" which, as I understand, will be sent to 
the driver for reporting.

To be more precise, one of the taskRunner in the loop "for (taskRunner <- 
runningTasks.values().asScala)"  contains a GenericInternalRow with a lot of 
data that should not go to the driver. The path would be in my case: 
taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if 
not the same) that I see when I do a driver heap dump. 

I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is 
fixed I would have less of this undesirable data in the driver and that I could 
run my streaming app for a long period of time, but I think there will be 
always some performance lost.






> Memory leak  org.apache.spark.sql.execution.ui.SQLTaskMetrics
> -
>
> Key: SPARK-17381
> URL: https://issues.apache.org/jira/browse/SPARK-17381
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: EMR 5.0.0 (submitted as yarn-client)
> Java Version  1.8.0_101 (Oracle Corporation)
> Scala Version version 2.11.8
> Problem also happens when I run locally with similar versions of java/scala. 
> OS: Ubuntu 16.04

[jira] [Created] (SPARK-12878) Dataframe fails with nested User Defined Types

2016-01-18 Thread Joao (JIRA)
Joao created SPARK-12878:


 Summary: Dataframe fails with nested User Defined Types
 Key: SPARK-12878
 URL: https://issues.apache.org/jira/browse/SPARK-12878
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.0
Reporter: Joao
Priority: Blocker


Spark 1.6.0 crashes when using nested User Defined Types in a Dataframe. 
In version 1.5.2 the code below worked just fine:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.types._

@SQLUserDefinedType(udt = classOf[AUDT])
case class A(list:Seq[B])

class AUDT extends UserDefinedType[A] {
  override def sqlType: DataType = StructType(Seq(StructField("list", 
ArrayType(BUDT, containsNull = false), nullable = true)))
  override def userClass: Class[A] = classOf[A]
  override def serialize(obj: Any): Any = obj match {
case A(list) =>
  val row = new GenericMutableRow(1).update(0, new 
GenericArrayData(list.map(_.asInstanceOf[Any]).toArray))
  row
  }

  override def deserialize(datum: Any): A = {
datum match {
  case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq)
}
  }
}

object AUDT extends AUDT

@SQLUserDefinedType(udt = classOf[BUDT])
case class B(text:Int)

class BUDT extends UserDefinedType[B] {
  override def sqlType: DataType = StructType(Seq(StructField("num", 
IntegerType, nullable = false)))
  override def userClass: Class[B] = classOf[B]
  override def serialize(obj: Any): Any = obj match {
case B(text) =>
  val row = new GenericMutableRow(1).setInt(0, text)
  row
  }

  override def deserialize(datum: Any): B = {
datum match {  case row: InternalRow => new B(row.getInt(0))  }
  }
}

object BUDT extends BUDT

object Test {
  def main(args:Array[String]) = {

val col = Seq(new A(Seq(new B(1), new B(2))),
  new A(Seq(new B(3), new B(4

val sc = new SparkContext(new 
SparkConf().setMaster("local[1]").setAppName("TestSpark"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val df = sc.parallelize(1 to 2 zip col).toDF("id","b")
df.select("b").show()
df.collect().foreach(println)
  }
}

In the new version (1.6.0) I needed to include the following import:

import org.apache.spark.sql.catalyst.expressions.GenericMutableRow

However, Spark crashes in runtime:

16/01/18 14:36:22 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to 
org.apache.spark.sql.catalyst.InternalRow
at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getStruct(rows.scala:51)
at 
org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getStruct(rows.scala:248)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at 
org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor

[jira] [Updated] (SPARK-12878) Dataframe fails with nested User Defined Types

2016-01-18 Thread Joao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joao updated SPARK-12878:
-
Description: 
Spark 1.6.0 crashes when using nested User Defined Types in a Dataframe. 
In version 1.5.2 the code below worked just fine:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.types._

@SQLUserDefinedType(udt = classOf[AUDT])
case class A(list:Seq[B])

class AUDT extends UserDefinedType[A] {
  override def sqlType: DataType = StructType(Seq(StructField("list", 
ArrayType(BUDT, containsNull = false), nullable = true)))
  override def userClass: Class[A] = classOf[A]
  override def serialize(obj: Any): Any = obj match {
case A(list) =>
  val row = new GenericMutableRow(1)
  row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray))
  row
  }

  override def deserialize(datum: Any): A = {
datum match {
  case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq)
}
  }
}

object AUDT extends AUDT

@SQLUserDefinedType(udt = classOf[BUDT])
case class B(text:Int)

class BUDT extends UserDefinedType[B] {
  override def sqlType: DataType = StructType(Seq(StructField("num", 
IntegerType, nullable = false)))
  override def userClass: Class[B] = classOf[B]
  override def serialize(obj: Any): Any = obj match {
case B(text) =>
  val row = new GenericMutableRow(1)
  row.setInt(0, text)
  row
  }

  override def deserialize(datum: Any): B = {
datum match {  case row: InternalRow => new B(row.getInt(0))  }
  }
}

object BUDT extends BUDT

object Test {
  def main(args:Array[String]) = {

val col = Seq(new A(Seq(new B(1), new B(2))),
  new A(Seq(new B(3), new B(4

val sc = new SparkContext(new 
SparkConf().setMaster("local[1]").setAppName("TestSpark"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val df = sc.parallelize(1 to 2 zip col).toDF("id","b")
df.select("b").show()
df.collect().foreach(println)
  }
}

In the new version (1.6.0) I needed to include the following import:

import org.apache.spark.sql.catalyst.expressions.GenericMutableRow

However, Spark crashes in runtime:

16/01/18 14:36:22 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to 
org.apache.spark.sql.catalyst.InternalRow
at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getStruct(rows.scala:51)
at 
org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getStruct(rows.scala:248)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at 
org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPool

[jira] [Created] (SPARK-10632) Cannot save DataFrame with User Defined Types

2015-09-16 Thread Joao (JIRA)
Joao created SPARK-10632:


 Summary: Cannot save DataFrame with User Defined Types
 Key: SPARK-10632
 URL: https://issues.apache.org/jira/browse/SPARK-10632
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.5.0
Reporter: Joao


Cannot save DataFrames that contain user-defined types.
At first I thought it was a problem with my udt class, then tried the Vector 
class from mlib and the error was the same.


Te code below should reproduce the error.
{noformat}
val df = sc.parallelize(Seq((1,Vectors.dense(1,1,1)), 
(2,Vectors.dense(2,2,2.toDF()
df.write.format("json").mode(SaveMode.Overwrite).save(path)
{noformat}

The error log is below

{noformat}
15/09/16 09:58:27 ERROR DefaultWriterContainer: Aborting task.
scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class 
org.apache.spark.sql.catalyst.expressions.GenericMutableRow)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:126)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$.apply(JacksonGenerator.scala:133)
at 
org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.writeInternal(JSONRelation.scala:185)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:243)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 
'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6'
 closed. Now beginning upload
15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 
'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6'
 upload complete
15/09/16 09:58:28 ERROR DefaultWriterContainer: Task attempt 
attempt_201509160958__m_00_0 aborted.
15/09/16 09:58:28 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Task failed while writing rows.
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:251)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class 
org.apache.spark.sql.catalyst.expressions.GenericMutableRow)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$o

[jira] [Updated] (SPARK-10632) Cannot save DataFrame with User Defined Types

2015-09-16 Thread Joao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-10632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joao updated SPARK-10632:
-
Description: 
Cannot save DataFrames that contain user-defined types.
At first I thought it was a problem with my udt class, then tried the Vector 
class from mlib and the error was the same.


The code below should reproduce the error.
{noformat}
val df = sc.parallelize(Seq((1,Vectors.dense(1,1,1)), 
(2,Vectors.dense(2,2,2.toDF()
df.write.format("json").mode(SaveMode.Overwrite).save(path)
{noformat}

The error log is below

{noformat}
15/09/16 09:58:27 ERROR DefaultWriterContainer: Aborting task.
scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class 
org.apache.spark.sql.catalyst.expressions.GenericMutableRow)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:126)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$.apply(JacksonGenerator.scala:133)
at 
org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.writeInternal(JSONRelation.scala:185)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:243)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 
'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6'
 closed. Now beginning upload
15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 
'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6'
 upload complete
15/09/16 09:58:28 ERROR DefaultWriterContainer: Task attempt 
attempt_201509160958__m_00_0 aborted.
15/09/16 09:58:28 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Task failed while writing rows.
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:251)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class 
org.apache.spark.sql.catalyst.expressions.GenericMutableRow)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103)
at 
org.apache.spark.sql.execution.datasources

[jira] [Updated] (SPARK-10632) Cannot save DataFrame with User Defined Types

2015-09-16 Thread Joao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-10632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joao updated SPARK-10632:
-
Description: 
Cannot save DataFrames that contain user-defined types.
I tried to save a dataframe with instances of the Vector class from mlib and 
got the error.

The code below should reproduce the error.
{noformat}
val df = sc.parallelize(Seq((1,Vectors.dense(1,1,1)), 
(2,Vectors.dense(2,2,2.toDF()
df.write.format("json").mode(SaveMode.Overwrite).save(path)
{noformat}

The error log is below

{noformat}
15/09/16 09:58:27 ERROR DefaultWriterContainer: Aborting task.
scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class 
org.apache.spark.sql.catalyst.expressions.GenericMutableRow)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:126)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$.apply(JacksonGenerator.scala:133)
at 
org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.writeInternal(JSONRelation.scala:185)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:243)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 
'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6'
 closed. Now beginning upload
15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 
'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6'
 upload complete
15/09/16 09:58:28 ERROR DefaultWriterContainer: Task attempt 
attempt_201509160958__m_00_0 aborted.
15/09/16 09:58:28 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Task failed while writing rows.
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:251)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class 
org.apache.spark.sql.catalyst.expressions.GenericMutableRow)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfu

[jira] [Created] (SPARK-10637) DataFrames: saving with nested User Data Types

2015-09-16 Thread Joao (JIRA)
Joao created SPARK-10637:


 Summary: DataFrames: saving with nested User Data Types
 Key: SPARK-10637
 URL: https://issues.apache.org/jira/browse/SPARK-10637
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.5.0
Reporter: Joao


Cannot save data frames using nested UserDefinedType
I wrote a simple example to show the error.

It causes the following error java.lang.IllegalArgumentException: Nested type 
should be repeated: required group array {
  required int32 num;
}

{code:java}
import org.apache.spark.sql.SaveMode
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.types._

@SQLUserDefinedType(udt = classOf[AUDT])
case class A(list:Seq[B])

class AUDT extends UserDefinedType[A] {
  override def sqlType: DataType = StructType(Seq(StructField("list", 
ArrayType(BUDT, containsNull = false), nullable = true)))
  override def userClass: Class[A] = classOf[A]
  override def serialize(obj: Any): Any = obj match {
case A(list) =>
  val row = new GenericMutableRow(1)
  row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray))
  row
  }

  override def deserialize(datum: Any): A = {
datum match {
  case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq)
}
  }
}

object AUDT extends AUDT

@SQLUserDefinedType(udt = classOf[BUDT])
case class B(num:Int)

class BUDT extends UserDefinedType[B] {
  override def sqlType: DataType = StructType(Seq(StructField("num", 
IntegerType, nullable = false)))
  override def userClass: Class[B] = classOf[B]
  override def serialize(obj: Any): Any = obj match {
case B(num) =>
  val row = new GenericMutableRow(1)
  row.setInt(0, num)
  row
  }

  override def deserialize(datum: Any): B = {
datum match {
  case row: InternalRow => new B(row.getInt(0))
}
  }
}

object BUDT extends BUDT

object TestNested {
  def main(args:Array[String]) = {
val col = Seq(new A(Seq(new B(1), new B(2))),
  new A(Seq(new B(3), new B(4

val sc = new SparkContext(new 
SparkConf().setMaster("local[1]").setAppName("TestSpark"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val df = sc.parallelize(1 to 2 zip col).toDF()

df.show()

df.write.mode(SaveMode.Overwrite).save(...)


  }
}
{code}

The error log is shown below:

{noformat}
15/09/16 16:44:36 WARN : Your hostname, X resolves to a loopback/non-reachable 
address: fe80:0:0:0:c4c7:8c4b:4a24:f8a1%14, but we couldn't find any external 
IP address!
15/09/16 16:44:38 INFO deprecation: mapred.job.id is deprecated. Instead, use 
mapreduce.job.id
15/09/16 16:44:38 INFO deprecation: mapred.tip.id is deprecated. Instead, use 
mapreduce.task.id
15/09/16 16:44:38 INFO deprecation: mapred.task.id is deprecated. Instead, use 
mapreduce.task.attempt.id
15/09/16 16:44:38 INFO deprecation: mapred.task.is.map is deprecated. Instead, 
use mapreduce.task.ismap
15/09/16 16:44:38 INFO deprecation: mapred.task.partition is deprecated. 
Instead, use mapreduce.task.partition
15/09/16 16:44:38 INFO ParquetRelation: Using default output committer for 
Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
15/09/16 16:44:38 INFO DefaultWriterContainer: Using user defined output 
committer class org.apache.parquet.hadoop.ParquetOutputCommitter
15/09/16 16:44:38 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 
localhost:50986 in memory (size: 1402.0 B, free: 973.6 MB)
15/09/16 16:44:38 INFO ContextCleaner: Cleaned accumulator 1
15/09/16 16:44:39 INFO SparkContext: Starting job: save at TestNested.scala:73
15/09/16 16:44:39 INFO DAGScheduler: Got job 1 (save at TestNested.scala:73) 
with 1 output partitions
15/09/16 16:44:39 INFO DAGScheduler: Final stage: ResultStage 1(save at 
TestNested.scala:73)
15/09/16 16:44:39 INFO DAGScheduler: Parents of final stage: List()
15/09/16 16:44:39 INFO DAGScheduler: Missing parents: List()
15/09/16 16:44:39 INFO DAGScheduler: Submitting ResultStage 1 
(MapPartitionsRDD[1] at rddToDataFrameHolder at TestNested.scala:69), which has 
no missing parents
15/09/16 16:44:39 INFO MemoryStore: ensureFreeSpace(59832) called with 
curMem=0, maxMem=1020914565
15/09/16 16:44:39 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 58.4 KB, free 973.6 MB)
15/09/16 16:44:39 INFO MemoryStore: ensureFreeSpace(20794) called with 
curMem=59832, maxMem=1020914565
15/09/16 16:44:39 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 20.3 KB, free 973.5 MB)
15/09/16 16:44:39 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
localhost:50986 (size: 20.3 KB, free: 973.6 MB)
15/09/16 16:44:39 INFO SparkContext: Created broadcast 1 from broadcast at 

[jira] [Updated] (SPARK-10637) DataFrames: saving with nested User Data Types

2015-09-16 Thread Joao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-10637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joao updated SPARK-10637:
-
Description: 
Cannot save data frames using nested UserDefinedType
I wrote a simple example to show the error.

It causes the following error java.lang.IllegalArgumentException: Nested type 
should be repeated: required group array {
  required int32 num;
}

{code:java}
import org.apache.spark.sql.SaveMode
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.types._

@SQLUserDefinedType(udt = classOf[AUDT])
case class A(list:Seq[B])

class AUDT extends UserDefinedType[A] {
  override def sqlType: DataType = StructType(Seq(StructField("list", 
ArrayType(BUDT, containsNull = false), nullable = true)))
  override def userClass: Class[A] = classOf[A]
  override def serialize(obj: Any): Any = obj match {
case A(list) =>
  val row = new GenericMutableRow(1)
  row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray))
  row
  }

  override def deserialize(datum: Any): A = {
datum match {
  case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq)
}
  }
}

object AUDT extends AUDT

@SQLUserDefinedType(udt = classOf[BUDT])
case class B(num:Int)

class BUDT extends UserDefinedType[B] {
  override def sqlType: DataType = StructType(Seq(StructField("num", 
IntegerType, nullable = false)))
  override def userClass: Class[B] = classOf[B]
  override def serialize(obj: Any): Any = obj match {
case B(num) =>
  val row = new GenericMutableRow(1)
  row.setInt(0, num)
  row
  }

  override def deserialize(datum: Any): B = {
datum match {
  case row: InternalRow => new B(row.getInt(0))
}
  }
}

object BUDT extends BUDT

object TestNested {
  def main(args:Array[String]) = {
val col = Seq(new A(Seq(new B(1), new B(2))),
  new A(Seq(new B(3), new B(4

val sc = new SparkContext(new 
SparkConf().setMaster("local[1]").setAppName("TestSpark"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val df = sc.parallelize(1 to 2 zip col).toDF()

df.show()

df.write.mode(SaveMode.Overwrite).save(...)


  }
}
{code}

The error log is shown below:

{noformat}
15/09/16 16:44:36 WARN : Your hostname, X resolves to a loopback/non-reachable 
address: fe80:0:0:0:c4c7:8c4b:4a24:f8a1%14, but we couldn't find any external 
IP address!
15/09/16 16:44:38 INFO deprecation: mapred.job.id is deprecated. Instead, use 
mapreduce.job.id
15/09/16 16:44:38 INFO deprecation: mapred.tip.id is deprecated. Instead, use 
mapreduce.task.id
15/09/16 16:44:38 INFO deprecation: mapred.task.id is deprecated. Instead, use 
mapreduce.task.attempt.id
15/09/16 16:44:38 INFO deprecation: mapred.task.is.map is deprecated. Instead, 
use mapreduce.task.ismap
15/09/16 16:44:38 INFO deprecation: mapred.task.partition is deprecated. 
Instead, use mapreduce.task.partition
15/09/16 16:44:38 INFO ParquetRelation: Using default output committer for 
Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
15/09/16 16:44:38 INFO DefaultWriterContainer: Using user defined output 
committer class org.apache.parquet.hadoop.ParquetOutputCommitter
15/09/16 16:44:38 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 
localhost:50986 in memory (size: 1402.0 B, free: 973.6 MB)
15/09/16 16:44:38 INFO ContextCleaner: Cleaned accumulator 1
15/09/16 16:44:39 INFO SparkContext: Starting job: save at TestNested.scala:73
15/09/16 16:44:39 INFO DAGScheduler: Got job 1 (save at TestNested.scala:73) 
with 1 output partitions
15/09/16 16:44:39 INFO DAGScheduler: Final stage: ResultStage 1(save at 
TestNested.scala:73)
15/09/16 16:44:39 INFO DAGScheduler: Parents of final stage: List()
15/09/16 16:44:39 INFO DAGScheduler: Missing parents: List()
15/09/16 16:44:39 INFO DAGScheduler: Submitting ResultStage 1 
(MapPartitionsRDD[1] at rddToDataFrameHolder at TestNested.scala:69), which has 
no missing parents
15/09/16 16:44:39 INFO MemoryStore: ensureFreeSpace(59832) called with 
curMem=0, maxMem=1020914565
15/09/16 16:44:39 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 58.4 KB, free 973.6 MB)
15/09/16 16:44:39 INFO MemoryStore: ensureFreeSpace(20794) called with 
curMem=59832, maxMem=1020914565
15/09/16 16:44:39 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 20.3 KB, free 973.5 MB)
15/09/16 16:44:39 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
localhost:50986 (size: 20.3 KB, free: 973.6 MB)
15/09/16 16:44:39 INFO SparkContext: Created broadcast 1 from broadcast at 
DAGScheduler.scala:861
15/09/16 16:44:39 INFO DAGScheduler: Submitting 1 missing tasks from 
ResultStage 1 (MapPartitionsRDD[1] at rddToDataFrameHolder at 
TestNested.sc

[jira] [Updated] (SPARK-10632) Cannot save DataFrame with User Defined Types

2015-09-16 Thread Joao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-10632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joao updated SPARK-10632:
-
Description: 
Cannot save DataFrames that contain user-defined types.
I tried to save a dataframe with instances of the Vector class from mlib and 
got the error.

The code below should reproduce the error.
{noformat}
val df = sc.parallelize(Seq((1,Vectors.dense(1,1,1)), 
(2,Vectors.dense(2,2,2.toDF()
df.write.format("json").mode(SaveMode.Overwrite).save(path)
{noformat}

The error log is below

{noformat}
15/09/16 09:58:27 ERROR DefaultWriterContainer: Aborting task.
scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class 
org.apache.spark.sql.catalyst.expressions.GenericMutableRow)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:126)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$.apply(JacksonGenerator.scala:133)
at 
org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.writeInternal(JSONRelation.scala:185)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:243)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 
'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6'
 closed. Now beginning upload
15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 
'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6'
 upload complete
15/09/16 09:58:28 ERROR DefaultWriterContainer: Task attempt 
attempt_201509160958__m_00_0 aborted.
15/09/16 09:58:28 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Task failed while writing rows.
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:251)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class 
org.apache.spark.sql.catalyst.expressions.GenericMutableRow)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194)
at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103)
at 
org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfu

[jira] [Commented] (SPARK-10632) Cannot save DataFrame with User Defined Types

2015-09-17 Thread Joao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14804314#comment-14804314
 ] 

Joao commented on SPARK-10632:
--

Thanks Joseph. I tried with Spark 1.5.0 in Windows 7, standalone mode. Do you 
think it may be a platform issue? I will be out from office until Tuesday, but 
I will try it on Linux.

> Cannot save DataFrame with User Defined Types
> -
>
> Key: SPARK-10632
> URL: https://issues.apache.org/jira/browse/SPARK-10632
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.0
>Reporter: Joao
>
> Cannot save DataFrames that contain user-defined types.
> I tried to save a dataframe with instances of the Vector class from mlib and 
> got the error.
> The code below should reproduce the error.
> {noformat}
> val df = sc.parallelize(Seq((1,Vectors.dense(1,1,1)), 
> (2,Vectors.dense(2,2,2.toDF()
> df.write.format("json").mode(SaveMode.Overwrite).save(path)
> {noformat}
> The error log is below
> {noformat}
> 15/09/16 09:58:27 ERROR DefaultWriterContainer: Aborting task.
> scala.MatchError: [1,null,null,[1.0,1.0,1.0]] (of class 
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow)
>   at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:194)
>   at org.apache.spark.mllib.linalg.VectorUDT.serialize(Vectors.scala:179)
>   at 
> org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:103)
>   at 
> org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89)
>   at 
> org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:126)
>   at 
> org.apache.spark.sql.execution.datasources.json.JacksonGenerator$$anonfun$org$apache$spark$sql$execution$datasources$json$JacksonGenerator$$valWriter$2$1.apply(JacksonGenerator.scala:89)
>   at 
> org.apache.spark.sql.execution.datasources.json.JacksonGenerator$.apply(JacksonGenerator.scala:133)
>   at 
> org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.writeInternal(JSONRelation.scala:185)
>   at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:243)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:88)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> 15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 
> 'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6'
>  closed. Now beginning upload
> 15/09/16 09:58:27 INFO NativeS3FileSystem: OutputStream for key 
> 'adad/_temporary/0/_temporary/attempt_201509160958__m_00_0/part-r-0-2a262ed4-be5a-4190-92a1-a5326cc76ed6'
>  upload complete
> 15/09/16 09:58:28 ERROR DefaultWriterContainer: Task attempt 
> attempt_201509160958__m_00_0 aborted.
> 15/09/16 09:58:28 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> org.apache.spark.SparkException: Task failed while writing rows.
>   at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:251)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:88)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(Th