[jira] [Commented] (SPARK-27485) Certain query plans fail to run when autoBroadcastJoinThreshold is set to -1
[ https://issues.apache.org/jira/browse/SPARK-27485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16821869#comment-16821869 ] Muthu Jayakumar commented on SPARK-27485: - Let me try to build a sql expression for this. What I currently have is a join with a large join query. I am trying to make a plan to make up some sizable test data and sql query. > Certain query plans fail to run when autoBroadcastJoinThreshold is set to -1 > > > Key: SPARK-27485 > URL: https://issues.apache.org/jira/browse/SPARK-27485 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.4.0 >Reporter: Muthu Jayakumar >Priority: Minor > > Certain queries fail with > {noformat} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:349) > at scala.None$.get(Option.scala:347) > at > org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$reorder$1(EnsureRequirements.scala:238) > at > org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$reorder$1$adapted(EnsureRequirements.scala:233) > at scala.collection.immutable.List.foreach(List.scala:388) > at > org.apache.spark.sql.execution.exchange.EnsureRequirements.reorder(EnsureRequirements.scala:233) > at > org.apache.spark.sql.execution.exchange.EnsureRequirements.reorderJoinKeys(EnsureRequirements.scala:262) > at > org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$reorderJoinPredicates(EnsureRequirements.scala:289) > at > org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304) > at > org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:296) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$4(TreeNode.scala:282) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:282) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275) > at > org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:296) > at > org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scal
[jira] [Created] (SPARK-27485) Certain query plans fail to run when autoBroadcastJoinThreshold is set to -1
Muthu Jayakumar created SPARK-27485: --- Summary: Certain query plans fail to run when autoBroadcastJoinThreshold is set to -1 Key: SPARK-27485 URL: https://issues.apache.org/jira/browse/SPARK-27485 Project: Spark Issue Type: Bug Components: Optimizer, SQL Affects Versions: 2.4.0 Reporter: Muthu Jayakumar Certain queries fail with {noformat} java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:349) at scala.None$.get(Option.scala:347) at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$reorder$1(EnsureRequirements.scala:238) at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$reorder$1$adapted(EnsureRequirements.scala:233) at scala.collection.immutable.List.foreach(List.scala:388) at org.apache.spark.sql.execution.exchange.EnsureRequirements.reorder(EnsureRequirements.scala:233) at org.apache.spark.sql.execution.exchange.EnsureRequirements.reorderJoinKeys(EnsureRequirements.scala:262) at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$reorderJoinPredicates(EnsureRequirements.scala:289) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:296) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$4(TreeNode.scala:282) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:282) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275) at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:296) at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:38) at org.apache.spark.sql.execution.QueryExecution.$anonfun$prepareForExecution$1(QueryExecution.scala:87) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:122) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:118) at scala.collection.immutable.List.foldLeft(List.scala:85) {noformat} I don't have an exact query reproducer for this. But, I can try to frame one if this problem hasn't been reported in the past? -- This message was sent by Atlassian
[jira] [Comment Edited] (SPARK-18058) AnalysisException may be thrown when union two DFs whose struct fields have different nullability
[ https://issues.apache.org/jira/browse/SPARK-18058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15596978#comment-15596978 ] Muthu Jayakumar edited comment on SPARK-18058 at 10/22/16 2:40 AM: --- Wanted to share the test I used while getting this error from Spark 2.0.0 Schema from parquet file. {code} scala> d1.printSchema() root |-- task_id: string (nullable = true) |-- task_name: string (nullable = true) |-- some_histogram: struct (nullable = true) ||-- values: array (nullable = true) |||-- element: double (containsNull = true) ||-- freq: array (nullable = true) |||-- element: long (containsNull = true) scala> d2.printSchema() //Data created using dataframe and/or processed before writing to parquet file. root |-- task_id: string (nullable = true) |-- task_name: string (nullable = true) |-- some_histogram: struct (nullable = true) ||-- values: array (nullable = true) |||-- element: double (containsNull = false) ||-- freq: array (nullable = true) |||-- element: long (containsNull = false) scala> d1.union(d2).printSchema() Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'Union; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49) at org.apache.spark.sql.Dataset.(Dataset.scala:161) at org.apache.spark.sql.Dataset.(Dataset.scala:167) at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59) at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594) at org.apache.spark.sql.Dataset.union(Dataset.scala:1459) {code} But the same union like operation works when I use the following schema {code} scala> df1.printSchema() root |-- col1: integer (nullable = false) |-- col2: string (nullable = true) |-- col3: double (nullable = false) |-- col4: string (nullable = false) scala> df2.printSchema() root |-- col1: integer (nullable = true) |-- col2: string (nullable = true) |-- col3: double (nullable = true) |-- col4: string (nullable = true) scala> d1.union(d2).printSchema() //this one worked. root |-- col1: integer (nullable = true) |-- col2: string (nullable = true) |-- col3: double (nullable = true) |-- col4: string (nullable = true) {code} was (Author: babloo80): Wanted to share the test I used while getting this error from Spark 2.0.0 Schema from parquet file. {code} d1.printSchema() root |-- task_id: string (nullable = true) |-- task_name: string (nullable = true) |-- some_histogram: struct (nullable = true) ||-- values: array (nullable = true) |||-- element: double (containsNull = true) ||-- freq: array (nullable = true) |||-- element: long (containsNull = true) d2.printSchema() //Data created using dataframe and/or processed before writing to parquet file. root |-- task_id: string (nullable = true) |-- task_name: string (nullable = true) |-- some_histogram: struct (nullable = true) ||-- values: array (nullable = true) |||-- element: double (containsNull = false) ||-- freq: array (nullable = true) |||-- element: long (containsNull = false) d1.union(d2).printSchema() Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'Union; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49) at org.apache.spark.sql.Datase
[jira] [Commented] (SPARK-18058) AnalysisException may be thrown when union two DFs whose struct fields have different nullability
[ https://issues.apache.org/jira/browse/SPARK-18058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15596978#comment-15596978 ] Muthu Jayakumar commented on SPARK-18058: - Wanted to share the test I used while getting this error from Spark 2.0.0 Schema from parquet file. {code} d1.printSchema() root |-- task_id: string (nullable = true) |-- task_name: string (nullable = true) |-- some_histogram: struct (nullable = true) ||-- values: array (nullable = true) |||-- element: double (containsNull = true) ||-- freq: array (nullable = true) |||-- element: long (containsNull = true) d2.printSchema() //Data created using dataframe and/or processed before writing to parquet file. root |-- task_id: string (nullable = true) |-- task_name: string (nullable = true) |-- some_histogram: struct (nullable = true) ||-- values: array (nullable = true) |||-- element: double (containsNull = false) ||-- freq: array (nullable = true) |||-- element: long (containsNull = false) d1.union(d2).printSchema() Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'Union; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49) at org.apache.spark.sql.Dataset.(Dataset.scala:161) at org.apache.spark.sql.Dataset.(Dataset.scala:167) at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59) at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594) at org.apache.spark.sql.Dataset.union(Dataset.scala:1459) {code} > AnalysisException may be thrown when union two DFs whose struct fields have > different nullability > - > > Key: SPARK-18058 > URL: https://issues.apache.org/jira/browse/SPARK-18058 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.2, 2.0.1 >Reporter: Cheng Lian > > The following Spark shell snippet reproduces this issue: > {code} > spark.range(10).createOrReplaceTempView("t1") > spark.range(10).map(i => i: > java.lang.Long).toDF("id").createOrReplaceTempView("t2") > sql("SELECT struct(id) FROM t1 UNION ALL SELECT struct(id) FROM t2") > {code} > {noformat} > org.apache.spark.sql.AnalysisException: Union can only be performed on tables > with the compatible column types. StructType(StructField(id,LongType,true)) > <> StructType(StructField(id,LongType,false)) at the first column of the > second table; > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$11$$anonfun$apply$12.apply(CheckAnalysis.scala:291) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$11$$anonfun$apply$12.apply(CheckAnalysis.scala:289) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$11.apply(CheckAnalysis.scala:289) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$11.apply(CheckAnalysis.scala:278) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:278) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:132) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57) > at > org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49) > at org.apache.spark.sql.Dataset$.ofRows(Dataset
[jira] [Commented] (SPARK-12783) Dataset map serialization error
[ https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15114340#comment-15114340 ] Muthu Jayakumar commented on SPARK-12783: - I moved it to another file altogether (as attached). I have another file that has the main thread like shown below.. {code} object SparkJira extends App{ val sc = //get sc. private val sqlContext: SQLContext = sc._2.sqlContext import sqlContext.implicits._ val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() df1.as[TestCaseClass].map(_.toStr).show() //works fine df1.as[TestCaseClass].map(_.toMyMap).show() //error } {code} > Dataset map serialization error > --- > > Key: SPARK-12783 > URL: https://issues.apache.org/jira/browse/SPARK-12783 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Muthu Jayakumar >Assignee: Wenchen Fan >Priority: Critical > Attachments: MyMap.scala > > > When Dataset API is used to map to another case class, an error is thrown. > {code} > case class MyMap(map: Map[String, String]) > case class TestCaseClass(a: String, b: String){ > def toMyMap: MyMap = { > MyMap(Map(a->b)) > } > def toStr: String = { > a > } > } > //Main method section below > import sqlContext.implicits._ > val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), > TestCaseClass("2015-05-01", "data2"))).toDF() > df1.as[TestCaseClass].map(_.toStr).show() //works fine > df1.as[TestCaseClass].map(_.toMyMap).show() //fails > {code} > Error message: > {quote} > Caused by: java.io.NotSerializableException: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 > Serialization stack: > - object not serializable (class: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: > package lang) > - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: > class scala.reflect.internal.Symbols$Symbol) > - object (class scala.reflect.internal.Types$UniqueThisType, > java.lang.type) > - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: > class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) > - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, > type: class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String) > - field (class: > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, > type: class scala.reflect.api.Types$TypeApi) > - object (class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) > - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, > name: function, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, > mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) > - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: > targetObject, type: class > org.apache.spark.sql.catalyst.expressions.Expression) > - object (class org.apache.spark.sql.catalyst.expressions.Invoke, > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;))) > - writeObject data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.List$SerializationProxy, > scala.collection.immutable.List$SerializationProxy@4c7e3aab) > - writeReplace data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.$colon$colon, > List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;)), > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object; > - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, > name: arguments, type: interface scala.coll
[jira] [Comment Edited] (SPARK-12783) Dataset map serialization error
[ https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15114340#comment-15114340 ] Muthu Jayakumar edited comment on SPARK-12783 at 1/24/16 3:12 PM: -- I moved it to another file altogether (as attached). I have another file that has the main thread like shown below.. {code} object SparkJira extends App{ val sc = //get sc. private val sqlContext: SQLContext = sc._2.sqlContext import sqlContext.implicits._ val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() df1.as[TestCaseClass].map(_.toStr).show() //works fine df1.as[TestCaseClass].map(_.toMyMap).show() //error } {code} I am using 1.6 release version for testing. Would want me to try with some other version? was (Author: babloo80): I moved it to another file altogether (as attached). I have another file that has the main thread like shown below.. {code} object SparkJira extends App{ val sc = //get sc. private val sqlContext: SQLContext = sc._2.sqlContext import sqlContext.implicits._ val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() df1.as[TestCaseClass].map(_.toStr).show() //works fine df1.as[TestCaseClass].map(_.toMyMap).show() //error } {code} > Dataset map serialization error > --- > > Key: SPARK-12783 > URL: https://issues.apache.org/jira/browse/SPARK-12783 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Muthu Jayakumar >Assignee: Wenchen Fan >Priority: Critical > Attachments: MyMap.scala > > > When Dataset API is used to map to another case class, an error is thrown. > {code} > case class MyMap(map: Map[String, String]) > case class TestCaseClass(a: String, b: String){ > def toMyMap: MyMap = { > MyMap(Map(a->b)) > } > def toStr: String = { > a > } > } > //Main method section below > import sqlContext.implicits._ > val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), > TestCaseClass("2015-05-01", "data2"))).toDF() > df1.as[TestCaseClass].map(_.toStr).show() //works fine > df1.as[TestCaseClass].map(_.toMyMap).show() //fails > {code} > Error message: > {quote} > Caused by: java.io.NotSerializableException: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 > Serialization stack: > - object not serializable (class: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: > package lang) > - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: > class scala.reflect.internal.Symbols$Symbol) > - object (class scala.reflect.internal.Types$UniqueThisType, > java.lang.type) > - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: > class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) > - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, > type: class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String) > - field (class: > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, > type: class scala.reflect.api.Types$TypeApi) > - object (class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) > - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, > name: function, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, > mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) > - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: > targetObject, type: class > org.apache.spark.sql.catalyst.expressions.Expression) > - object (class org.apache.spark.sql.catalyst.expressions.Invoke, > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;))) > - writeObject data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.List$SerializationProxy, > scala.collection.immutable.List$SerializationProxy@4c7e3aab) > - writeReplace data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.$colon$colon, > List(
[jira] [Updated] (SPARK-12783) Dataset map serialization error
[ https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Muthu Jayakumar updated SPARK-12783: Attachment: MyMap.scala > Dataset map serialization error > --- > > Key: SPARK-12783 > URL: https://issues.apache.org/jira/browse/SPARK-12783 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Muthu Jayakumar >Assignee: Wenchen Fan >Priority: Critical > Attachments: MyMap.scala > > > When Dataset API is used to map to another case class, an error is thrown. > {code} > case class MyMap(map: Map[String, String]) > case class TestCaseClass(a: String, b: String){ > def toMyMap: MyMap = { > MyMap(Map(a->b)) > } > def toStr: String = { > a > } > } > //Main method section below > import sqlContext.implicits._ > val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), > TestCaseClass("2015-05-01", "data2"))).toDF() > df1.as[TestCaseClass].map(_.toStr).show() //works fine > df1.as[TestCaseClass].map(_.toMyMap).show() //fails > {code} > Error message: > {quote} > Caused by: java.io.NotSerializableException: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 > Serialization stack: > - object not serializable (class: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: > package lang) > - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: > class scala.reflect.internal.Symbols$Symbol) > - object (class scala.reflect.internal.Types$UniqueThisType, > java.lang.type) > - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: > class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) > - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, > type: class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String) > - field (class: > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, > type: class scala.reflect.api.Types$TypeApi) > - object (class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) > - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, > name: function, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, > mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) > - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: > targetObject, type: class > org.apache.spark.sql.catalyst.expressions.Expression) > - object (class org.apache.spark.sql.catalyst.expressions.Invoke, > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;))) > - writeObject data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.List$SerializationProxy, > scala.collection.immutable.List$SerializationProxy@4c7e3aab) > - writeReplace data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.$colon$colon, > List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;)), > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object; > - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, > name: arguments, type: interface scala.collection.Seq) > - object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, > staticinvoke(class > org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface > scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > > [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,Ma
[jira] [Commented] (SPARK-12783) Dataset map serialization error
[ https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102482#comment-15102482 ] Muthu Jayakumar commented on SPARK-12783: - Hello Kevin, Here is what I am seeing... from shell: {code} Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.6.0 /_/ Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_60) Type in expressions to have them evaluated. Type :help for more information. scala> case class MyMap(map: Map[String, String]) defined class MyMap scala> :paste // Entering paste mode (ctrl-D to finish) case class TestCaseClass(a: String, b: String){ def toMyMap: MyMap = { MyMap(Map(a->b)) } def toStr: String = { a } } // Exiting paste mode, now interpreting. defined class TestCaseClass scala> TestCaseClass("a", "nn") res4: TestCaseClass = TestCaseClass(a,nn) scala> import sqlContext.implicits._ import sqlContext.implicits._ scala> val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `TestCaseClass` without access to the scope that this class was defined in. Try moving this class out of its parent class.; at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$2.applyOrElse(ExpressionEncoder.scala:264) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$2.applyOrElse(ExpressionEncoder.scala:260) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:242) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:233) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:260) at org.apache.spark.sql.Dataset.(Dataset.scala:78) at org.apache.spark.sql.Dataset.(Dataset.scala:89) at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:507) ... 52 elided {code} I do remember seeing the above error stack, if the case class was defined inside the scope of an object (For example: If defined inside MyApp like in the example below as it becomes an inner class) >From code, I added an explicit import and eventually changed to use fully >qualified class names like below... {code} import scala.collection.{Map => ImMap} case class MyMap(map: ImMap[String, String]) case class TestCaseClass(a: String, b: String){ def toMyMap: MyMap = { MyMap(ImMap(a->b)) } def toStr: String = { a } } object MyApp extends App { //Get handle to contexts... import sqlContext.implicits._ val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() df1.as[TestCaseClass].map(_.toStr).show() //works fine df1.as[TestCaseClass].map(_.toMyMap).show() //error } {code} and {code} case class MyMap(map: scala.collection.Map[String, String]) case class TestCaseClass(a: String, b: String){ def toMyMap: MyMap = { MyMap(scala.collection.Map(a->b)) } def toStr: String = { a } } object MyApp extends App { //Get handle to contexts... import sqlContext.implicits._ val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() df1.as[TestCaseClass].map(_.toStr).show() //works fine df1.as[TestCaseClass].map(_.toMyMap).show() //error } {code} Please advice on what I may be missing. I misread the earlier comment and tried to use immutable map incorrectly :(. > Dataset map serialization error > --- > > Key: SPARK-12783 > URL: https://issues.apache.org/jira/browse/SPARK-12783 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Muthu Jayakumar >Assignee: Wenchen Fan >Priority: Critical > > When Dataset API is used to map to another case class, an error is thrown. > {code} > case class MyMap(map: Map[String, String]) > case class TestCaseClass(a: String, b: String){ > def toMyMap: MyMap = { > MyMap(Map(a->b)) > } > def toStr: String = { > a > } > } > //Main method section below > import sqlContext.implicits._ > val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), > TestCaseClass("2015-05-01", "data2"))).toDF() > df1.as[TestCaseClass].map(_.toStr).show() //works fine > df1.as[TestCaseClass].map(_.toMyMap).show() //fails > {code} > Error message: > {quote} > Caus
[jira] [Comment Edited] (SPARK-12783) Dataset map serialization error
[ https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102351#comment-15102351 ] Muthu Jayakumar edited comment on SPARK-12783 at 1/15/16 9:09 PM: -- I tried the following, but got similar error... {code} case class MyMap(map: scala.collection.immutable.Map[String, String]) case class TestCaseClass(a: String, b: String){ def toMyMap: MyMap = { MyMap(Map(a->b)) } def toStr: String = { a } } //main thread... val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() df1.as[TestCaseClass].map(_.toStr).show() //works fine df1.as[TestCaseClass].map(_.toMyMap).show() //error df1.as[TestCaseClass].map(each=> each.a -> each.b).show() //works fine {code} {quote} Serialization stack: - object not serializable (class: scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: package lang) - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol) - object (class scala.reflect.internal.Types$UniqueThisType, java.lang.type) - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class scala.reflect.internal.Types$Type) - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, type: class scala.reflect.internal.Types$Type) - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String) - field (class: org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, type: class scala.reflect.api.Types$TypeApi) - object (class org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: targetObject, type: class org.apache.spark.sql.catalyst.expressions.Expression) - object (class org.apache.spark.sql.catalyst.expressions.Invoke, invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;))) - writeObject data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@2660f093) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)), invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object; - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments, type: interface scala.collection.Seq) - object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),true)) - writeObject data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@72af5ac7) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object
[jira] [Comment Edited] (SPARK-12783) Dataset map serialization error
[ https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102351#comment-15102351 ] Muthu Jayakumar edited comment on SPARK-12783 at 1/15/16 7:34 PM: -- I tried the following, but got similar error... {code} case class MyMap(map: scala.collection.immutable.Map[String, String]) case class TestCaseClass(a: String, b: String){ def toMyMap: MyMap = { MyMap(Map(a->b)) } def toStr: String = { a } } //main thread... val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() //.withColumn("swh_date_to_common_request_id_map", f1(col("_1"), col("_2"))).drop("_1").drop("_2") df1.as[TestCaseClass].map(_.toStr).show() //works fine df1.as[TestCaseClass].map(_.toMyMap).show() //error df1.as[TestCaseClass].map(each=> each.a -> each.b).show() //works fine {code} {quote} Serialization stack: - object not serializable (class: scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: package lang) - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol) - object (class scala.reflect.internal.Types$UniqueThisType, java.lang.type) - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class scala.reflect.internal.Types$Type) - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, type: class scala.reflect.internal.Types$Type) - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String) - field (class: org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, type: class scala.reflect.api.Types$TypeApi) - object (class org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: targetObject, type: class org.apache.spark.sql.catalyst.expressions.Expression) - object (class org.apache.spark.sql.catalyst.expressions.Invoke, invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;))) - writeObject data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@2660f093) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)), invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object; - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments, type: interface scala.collection.Seq) - object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),true)) - writeObject data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@72af5ac7)
[jira] [Commented] (SPARK-12783) Dataset map serialization error
[ https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102351#comment-15102351 ] Muthu Jayakumar commented on SPARK-12783: - I tried the following, but got similar error... {code} case class MyMap(map: scala.collection.immutable.Map[String, String]) case class TestCaseClass(a: String, b: String){ def toMyMap: MyMap = { MyMap(Map(a->b)) } def toStr: String = { a } } //main thread... val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() //.withColumn("swh_date_to_common_request_id_map", f1(col("_1"), col("_2"))).drop("_1").drop("_2") df1.as[TestCaseClass].map(_.toStr).show() //works fine df1.as[TestCaseClass].map(_.toMyMap).show() //error df1.as[TestCaseClass].map(each=> each.a -> each.b).show() //works fine {code} > Dataset map serialization error > --- > > Key: SPARK-12783 > URL: https://issues.apache.org/jira/browse/SPARK-12783 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Muthu Jayakumar >Assignee: Wenchen Fan >Priority: Critical > > When Dataset API is used to map to another case class, an error is thrown. > {code} > case class MyMap(map: Map[String, String]) > case class TestCaseClass(a: String, b: String){ > def toMyMap: MyMap = { > MyMap(Map(a->b)) > } > def toStr: String = { > a > } > } > //Main method section below > import sqlContext.implicits._ > val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), > TestCaseClass("2015-05-01", "data2"))).toDF() > df1.as[TestCaseClass].map(_.toStr).show() //works fine > df1.as[TestCaseClass].map(_.toMyMap).show() //fails > {code} > Error message: > {quote} > Caused by: java.io.NotSerializableException: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 > Serialization stack: > - object not serializable (class: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: > package lang) > - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: > class scala.reflect.internal.Symbols$Symbol) > - object (class scala.reflect.internal.Types$UniqueThisType, > java.lang.type) > - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: > class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) > - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, > type: class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String) > - field (class: > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, > type: class scala.reflect.api.Types$TypeApi) > - object (class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) > - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, > name: function, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, > mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) > - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: > targetObject, type: class > org.apache.spark.sql.catalyst.expressions.Expression) > - object (class org.apache.spark.sql.catalyst.expressions.Invoke, > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;))) > - writeObject data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.List$SerializationProxy, > scala.collection.immutable.List$SerializationProxy@4c7e3aab) > - writeReplace data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.$colon$colon, > List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;)), > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljav
[jira] [Created] (SPARK-12787) Dataset to support custom encoder
Muthu Jayakumar created SPARK-12787: --- Summary: Dataset to support custom encoder Key: SPARK-12787 URL: https://issues.apache.org/jira/browse/SPARK-12787 Project: Spark Issue Type: New Feature Affects Versions: 1.6.0 Reporter: Muthu Jayakumar The current Dataset API allows to be loaded using a case-class that requires the the attribute name and types to be match up precisely. It would be nicer, if a Partial function can be provided as a parameter to transform the Dataframe like schema into Dataset. Something like... test_dataframe.as[TestCaseClass](partial_function) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-12783) Dataset map
Muthu Jayakumar created SPARK-12783: --- Summary: Dataset map Key: SPARK-12783 URL: https://issues.apache.org/jira/browse/SPARK-12783 Project: Spark Issue Type: Bug Affects Versions: 1.6.0 Reporter: Muthu Jayakumar When Dataset API is used to map to another case class, an error is thrown. {code} case class MyMap(map: Map[String, String]) case class TestCaseClass(a: String, b: String){ def toMyMap: MyMap = { MyMap(Map(a->b)) } def toStr: String = { a } } //Main method section below import sqlContext.implicits._ val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() df1.as[TestCaseClass].map(_.toStr).show() //works fine df1.as[TestCaseClass].map(_.toMyMap).show() //fails {code} Error message: {quote} Caused by: java.io.NotSerializableException: scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 Serialization stack: - object not serializable (class: scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: package lang) - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol) - object (class scala.reflect.internal.Types$UniqueThisType, java.lang.type) - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class scala.reflect.internal.Types$Type) - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, type: class scala.reflect.internal.Types$Type) - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String) - field (class: org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, type: class scala.reflect.api.Types$TypeApi) - object (class org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: targetObject, type: class org.apache.spark.sql.catalyst.expressions.Expression) - object (class org.apache.spark.sql.catalyst.expressions.Invoke, invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;))) - writeObject data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@4c7e3aab) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)), invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object; - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments, type: interface scala.collection.Seq) - object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),true)) - writeObject data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationP