[ https://issues.apache.org/jira/browse/SPARK-10735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14907011#comment-14907011 ]
Josh Rosen commented on SPARK-10735: ------------------------------------ I would _not_ recommend using UserDefinedType. UDT was intended to remain a private API for Spark's internal use (specifically, it was created for use by MLLib). There are multiple problems with the current UDT API which make it difficult to use in a stable way from non-Spark code (for example, it implicitly exposes some details of Catalyst's internal representation of data types, which are subject to change across releases). I'm not sure that your use-case is something that Spark SQL explicitly intended to support; rather, I think that it happened to work by accident. The problem with allowing columns of arbitrary data types to be carried around as part of rows is that we don't know how to efficiently encode those columns into Tungsten's binary row format and don't know how to perform efficient equality comparison or hashing on those data types. There are two short-term fixes which might be able to re-enable this implicit functionality for Spark 1.5.x: - Modify CatalystTypeConverters to be permissive and allow arbitrary objects only when Tungsten mode is disabled. - Create a new internal DataType to represent arbitrary Java objects that Spark SQL doesn't understand. Update the schema inference code to handle unknown data types using this type. This would allow Spark SQL to automatically fall back to the non-Tungsten code paths when processing rows which contain arbitrary objects. How important is this feature / use-case to you? I'd like to try to prioritize / triage to see how high of a priority this is to fix in 1.5.2. Also, note that in the longer term we are thinking about some new APIs which may handle this use-case in a more principled / explicitly-supported way. > CatalystTypeConverters MatchError converting RDD with custom object to > dataframe > -------------------------------------------------------------------------------- > > Key: SPARK-10735 > URL: https://issues.apache.org/jira/browse/SPARK-10735 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.5.0 > Reporter: Thomas Graves > Priority: Critical > > In spark 1.5.0 we are now seeing an exception when converting an RDD with > custom object to a dataframe. Note this works with Spark 1.4.1. > RDD<BasicData> > where BasicData class has a field ArrayList<Beacon> where Beacon is a user > defined class now converting RDD<BasicData> to DataFrame is causing the issue: > {code} > 15/09/21 18:53:16 ERROR executor.Executor: Managed memory leak detected; size > = 2097152 bytes, TID = 408 > 15/09/21 18:53:16 ERROR executor.Executor: Exception in task 0.0 in stage 4.0 > (TID 408) > scala.MatchError: foo.Beacon@5c289b39 (of class foo.Beacon) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) > > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:245) > > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:164) > > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:148) > > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396) > at > org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(SQLContext.scala:494) > at > org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(SQLContext.scala:494) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) > at > org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(SQLContext.scala:494) > at > org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(SQLContext.scala:492) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:372) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > at > org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org