[ 
https://issues.apache.org/jira/browse/SPARK-10735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14907011#comment-14907011
 ] 

Josh Rosen commented on SPARK-10735:
------------------------------------

I would _not_ recommend using UserDefinedType. UDT was intended to remain a 
private API for Spark's internal use (specifically, it was created for use by 
MLLib). There are multiple problems with the current UDT API which make it 
difficult to use in a stable way from non-Spark code (for example, it 
implicitly exposes some details of Catalyst's internal representation of data 
types, which are subject to change across releases).

I'm not sure that your use-case is something that Spark SQL explicitly intended 
to support; rather, I think that it happened to work by accident. The problem 
with allowing columns of arbitrary data types to be carried around as part of 
rows is that we don't know how to efficiently encode those columns into 
Tungsten's binary row format and don't know how to perform efficient equality 
comparison or hashing on those data types.

There are two short-term fixes which might be able to re-enable this implicit 
functionality for Spark 1.5.x:

- Modify CatalystTypeConverters to be permissive and allow arbitrary objects 
only when Tungsten mode is disabled.
- Create a new internal DataType to represent arbitrary Java objects that Spark 
SQL doesn't understand. Update the schema inference code to handle unknown data 
types using this type. This would allow Spark SQL to automatically fall back to 
the non-Tungsten code paths when processing rows which contain arbitrary 
objects.

How important is this feature / use-case to you? I'd like to try to prioritize 
/ triage to see how high of a priority this is to fix in 1.5.2.

Also, note that in the longer term we are thinking about some new APIs which 
may handle this use-case in a more principled / explicitly-supported way.

> CatalystTypeConverters MatchError converting RDD with custom object to 
> dataframe
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-10735
>                 URL: https://issues.apache.org/jira/browse/SPARK-10735
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.0
>            Reporter: Thomas Graves
>            Priority: Critical
>
> In spark 1.5.0 we are now seeing an exception when converting an RDD with 
> custom object to a dataframe.  Note this works with Spark 1.4.1.
> RDD<BasicData>
> where BasicData class has a field ArrayList<Beacon> where Beacon is a user 
> defined class now converting RDD<BasicData> to DataFrame is causing the issue:
> {code}
> 15/09/21 18:53:16 ERROR executor.Executor: Managed memory leak detected; size 
> = 2097152 bytes, TID = 408
> 15/09/21 18:53:16 ERROR executor.Executor: Exception in task 0.0 in stage 4.0 
> (TID 408)
> scala.MatchError: foo.Beacon@5c289b39 (of class foo.Beacon)
>         at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
>   
>         at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:245)
>   
>         at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
>         at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:164)
>    
>         at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:148)
>    
>         at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
>         at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
>         at 
> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(SQLContext.scala:494)
>         at 
> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(SQLContext.scala:494)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>         at 
> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(SQLContext.scala:494)
>         at 
> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(SQLContext.scala:492)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:372)
>         at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
>         at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
>         at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
>         at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
>         at 
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:88)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to