[jira] [Commented] (SPARK-27485) Certain query plans fail to run when autoBroadcastJoinThreshold is set to -1

2019-04-19 Thread Muthu Jayakumar (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16821869#comment-16821869
 ] 

Muthu Jayakumar commented on SPARK-27485:
-

Let me try to build a sql expression for this. What I currently have is a join 
with a large join query.
I am trying to make a plan to make up some sizable test data and sql query. 

> Certain query plans fail to run when autoBroadcastJoinThreshold is set to -1
> 
>
> Key: SPARK-27485
> URL: https://issues.apache.org/jira/browse/SPARK-27485
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.4.0
>Reporter: Muthu Jayakumar
>Priority: Minor
>
> Certain queries fail with
> {noformat}
> java.util.NoSuchElementException: None.get
>   at scala.None$.get(Option.scala:349)
>   at scala.None$.get(Option.scala:347)
>   at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$reorder$1(EnsureRequirements.scala:238)
>   at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$reorder$1$adapted(EnsureRequirements.scala:233)
>   at scala.collection.immutable.List.foreach(List.scala:388)
>   at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements.reorder(EnsureRequirements.scala:233)
>   at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements.reorderJoinKeys(EnsureRequirements.scala:262)
>   at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$reorderJoinPredicates(EnsureRequirements.scala:289)
>   at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
>   at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:296)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$4(TreeNode.scala:282)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:282)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
>   at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:296)
>   at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scal

[jira] [Created] (SPARK-27485) Certain query plans fail to run when autoBroadcastJoinThreshold is set to -1

2019-04-16 Thread Muthu Jayakumar (JIRA)
Muthu Jayakumar created SPARK-27485:
---

 Summary: Certain query plans fail to run when 
autoBroadcastJoinThreshold is set to -1
 Key: SPARK-27485
 URL: https://issues.apache.org/jira/browse/SPARK-27485
 Project: Spark
  Issue Type: Bug
  Components: Optimizer, SQL
Affects Versions: 2.4.0
Reporter: Muthu Jayakumar


Certain queries fail with
{noformat}
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:349)
at scala.None$.get(Option.scala:347)
at 
org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$reorder$1(EnsureRequirements.scala:238)
at 
org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$reorder$1$adapted(EnsureRequirements.scala:233)
at scala.collection.immutable.List.foreach(List.scala:388)
at 
org.apache.spark.sql.execution.exchange.EnsureRequirements.reorder(EnsureRequirements.scala:233)
at 
org.apache.spark.sql.execution.exchange.EnsureRequirements.reorderJoinKeys(EnsureRequirements.scala:262)
at 
org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$reorderJoinPredicates(EnsureRequirements.scala:289)
at 
org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
at 
org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:296)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$4(TreeNode.scala:282)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:282)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
at 
org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:296)
at 
org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:38)
at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$prepareForExecution$1(QueryExecution.scala:87)
at 
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:122)
at 
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:118)
at scala.collection.immutable.List.foldLeft(List.scala:85)
{noformat}

I don't have an exact query reproducer for this. But, I can try to frame one if 
this problem hasn't been reported in the past?



--
This message was sent by Atlassian 

[jira] [Comment Edited] (SPARK-18058) AnalysisException may be thrown when union two DFs whose struct fields have different nullability

2016-10-21 Thread Muthu Jayakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15596978#comment-15596978
 ] 

Muthu Jayakumar edited comment on SPARK-18058 at 10/22/16 2:40 AM:
---

Wanted to share the test I used while getting this error from Spark 2.0.0

Schema from parquet file.
{code}
scala> d1.printSchema()
root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = true)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = true)

scala> d2.printSchema() //Data created using dataframe and/or processed before 
writing to parquet file.
root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = false)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = false)

scala> d1.union(d2).printSchema()
Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved 
operator 'Union;
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at org.apache.spark.sql.Dataset.(Dataset.scala:161)
at org.apache.spark.sql.Dataset.(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)

{code}

But the same union like operation works when I use the following schema
{code}
scala> df1.printSchema()
root
 |-- col1: integer (nullable = false)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = false)
 |-- col4: string (nullable = false)

scala> df2.printSchema()
root
 |-- col1: integer (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = true)
 |-- col4: string (nullable = true)

scala> d1.union(d2).printSchema() //this one worked.
root
 |-- col1: integer (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = true)
 |-- col4: string (nullable = true)
{code}


was (Author: babloo80):
Wanted to share the test I used while getting this error from Spark 2.0.0

Schema from parquet file.
{code}
d1.printSchema()
root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = true)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = true)

d2.printSchema() //Data created using dataframe and/or processed before writing 
to parquet file.
root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = false)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = false)

d1.union(d2).printSchema()
Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved 
operator 'Union;
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at org.apache.spark.sql.Datase

[jira] [Commented] (SPARK-18058) AnalysisException may be thrown when union two DFs whose struct fields have different nullability

2016-10-21 Thread Muthu Jayakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15596978#comment-15596978
 ] 

Muthu Jayakumar commented on SPARK-18058:
-

Wanted to share the test I used while getting this error from Spark 2.0.0

Schema from parquet file.
{code}
d1.printSchema()
root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = true)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = true)

d2.printSchema() //Data created using dataframe and/or processed before writing 
to parquet file.
root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = false)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = false)

d1.union(d2).printSchema()
Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved 
operator 'Union;
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at org.apache.spark.sql.Dataset.(Dataset.scala:161)
at org.apache.spark.sql.Dataset.(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)

{code}

> AnalysisException may be thrown when union two DFs whose struct fields have 
> different nullability
> -
>
> Key: SPARK-18058
> URL: https://issues.apache.org/jira/browse/SPARK-18058
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2, 2.0.1
>Reporter: Cheng Lian
>
> The following Spark shell snippet reproduces this issue:
> {code}
> spark.range(10).createOrReplaceTempView("t1")
> spark.range(10).map(i => i: 
> java.lang.Long).toDF("id").createOrReplaceTempView("t2")
> sql("SELECT struct(id) FROM t1 UNION ALL SELECT struct(id) FROM t2")
> {code}
> {noformat}
> org.apache.spark.sql.AnalysisException: Union can only be performed on tables 
> with the compatible column types. StructType(StructField(id,LongType,true)) 
> <> StructType(StructField(id,LongType,false)) at the first column of the 
> second table;
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$11$$anonfun$apply$12.apply(CheckAnalysis.scala:291)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$11$$anonfun$apply$12.apply(CheckAnalysis.scala:289)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$11.apply(CheckAnalysis.scala:289)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$11.apply(CheckAnalysis.scala:278)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:278)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:132)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)
>   at 
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
>   at org.apache.spark.sql.Dataset$.ofRows(Dataset

[jira] [Commented] (SPARK-12783) Dataset map serialization error

2016-01-24 Thread Muthu Jayakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15114340#comment-15114340
 ] 

Muthu Jayakumar commented on SPARK-12783:
-

I moved it to another file altogether (as attached).
I have another file that has the main thread like shown below..
{code}
object SparkJira extends App{
  val sc = //get sc.

  private val sqlContext: SQLContext = sc._2.sqlContext

  import sqlContext.implicits._
  val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), 
TestCaseClass("2015-05-01", "data2"))).toDF()

  df1.as[TestCaseClass].map(_.toStr).show() //works fine
  df1.as[TestCaseClass].map(_.toMyMap).show() //error
}

{code} 

> Dataset map serialization error
> ---
>
> Key: SPARK-12783
> URL: https://issues.apache.org/jira/browse/SPARK-12783
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Muthu Jayakumar
>Assignee: Wenchen Fan
>Priority: Critical
> Attachments: MyMap.scala
>
>
> When Dataset API is used to map to another case class, an error is thrown.
> {code}
> case class MyMap(map: Map[String, String])
> case class TestCaseClass(a: String, b: String){
>   def toMyMap: MyMap = {
> MyMap(Map(a->b))
>   }
>   def toStr: String = {
> a
>   }
> }
> //Main method section below
> import sqlContext.implicits._
> val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), 
> TestCaseClass("2015-05-01", "data2"))).toDF()
> df1.as[TestCaseClass].map(_.toStr).show() //works fine
> df1.as[TestCaseClass].map(_.toMyMap).show() //fails
> {code}
> Error message:
> {quote}
> Caused by: java.io.NotSerializableException: 
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1
> Serialization stack:
>   - object not serializable (class: 
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: 
> package lang)
>   - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: 
> class scala.reflect.internal.Symbols$Symbol)
>   - object (class scala.reflect.internal.Types$UniqueThisType, 
> java.lang.type)
>   - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: 
> class scala.reflect.internal.Types$Type)
>   - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String)
>   - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, 
> type: class scala.reflect.internal.Types$Type)
>   - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String)
>   - field (class: 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, 
> type: class scala.reflect.api.Types$TypeApi)
>   - object (class 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, )
>   - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, 
> name: function, type: interface scala.Function1)
>   - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, 
> mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>  field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType))
>   - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: 
> targetObject, type: class 
> org.apache.spark.sql.catalyst.expressions.Expression)
>   - object (class org.apache.spark.sql.catalyst.expressions.Invoke, 
> invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>  field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>  [Ljava.lang.Object;)))
>   - writeObject data (class: 
> scala.collection.immutable.List$SerializationProxy)
>   - object (class scala.collection.immutable.List$SerializationProxy, 
> scala.collection.immutable.List$SerializationProxy@4c7e3aab)
>   - writeReplace data (class: 
> scala.collection.immutable.List$SerializationProxy)
>   - object (class scala.collection.immutable.$colon$colon, 
> List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>  field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>  [Ljava.lang.Object;)), 
> invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>  field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>  [Ljava.lang.Object;
>   - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, 
> name: arguments, type: interface scala.coll

[jira] [Comment Edited] (SPARK-12783) Dataset map serialization error

2016-01-24 Thread Muthu Jayakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15114340#comment-15114340
 ] 

Muthu Jayakumar edited comment on SPARK-12783 at 1/24/16 3:12 PM:
--

I moved it to another file altogether (as attached).
I have another file that has the main thread like shown below..
{code}
object SparkJira extends App{
  val sc = //get sc.

  private val sqlContext: SQLContext = sc._2.sqlContext

  import sqlContext.implicits._
  val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), 
TestCaseClass("2015-05-01", "data2"))).toDF()

  df1.as[TestCaseClass].map(_.toStr).show() //works fine
  df1.as[TestCaseClass].map(_.toMyMap).show() //error
}
{code} 

I am using 1.6 release version for testing. Would want me to try with some 
other version?


was (Author: babloo80):
I moved it to another file altogether (as attached).
I have another file that has the main thread like shown below..
{code}
object SparkJira extends App{
  val sc = //get sc.

  private val sqlContext: SQLContext = sc._2.sqlContext

  import sqlContext.implicits._
  val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), 
TestCaseClass("2015-05-01", "data2"))).toDF()

  df1.as[TestCaseClass].map(_.toStr).show() //works fine
  df1.as[TestCaseClass].map(_.toMyMap).show() //error
}

{code} 

> Dataset map serialization error
> ---
>
> Key: SPARK-12783
> URL: https://issues.apache.org/jira/browse/SPARK-12783
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Muthu Jayakumar
>Assignee: Wenchen Fan
>Priority: Critical
> Attachments: MyMap.scala
>
>
> When Dataset API is used to map to another case class, an error is thrown.
> {code}
> case class MyMap(map: Map[String, String])
> case class TestCaseClass(a: String, b: String){
>   def toMyMap: MyMap = {
> MyMap(Map(a->b))
>   }
>   def toStr: String = {
> a
>   }
> }
> //Main method section below
> import sqlContext.implicits._
> val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), 
> TestCaseClass("2015-05-01", "data2"))).toDF()
> df1.as[TestCaseClass].map(_.toStr).show() //works fine
> df1.as[TestCaseClass].map(_.toMyMap).show() //fails
> {code}
> Error message:
> {quote}
> Caused by: java.io.NotSerializableException: 
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1
> Serialization stack:
>   - object not serializable (class: 
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: 
> package lang)
>   - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: 
> class scala.reflect.internal.Symbols$Symbol)
>   - object (class scala.reflect.internal.Types$UniqueThisType, 
> java.lang.type)
>   - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: 
> class scala.reflect.internal.Types$Type)
>   - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String)
>   - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, 
> type: class scala.reflect.internal.Types$Type)
>   - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String)
>   - field (class: 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, 
> type: class scala.reflect.api.Types$TypeApi)
>   - object (class 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, )
>   - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, 
> name: function, type: interface scala.Function1)
>   - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, 
> mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>  field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType))
>   - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: 
> targetObject, type: class 
> org.apache.spark.sql.catalyst.expressions.Expression)
>   - object (class org.apache.spark.sql.catalyst.expressions.Invoke, 
> invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>  field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>  [Ljava.lang.Object;)))
>   - writeObject data (class: 
> scala.collection.immutable.List$SerializationProxy)
>   - object (class scala.collection.immutable.List$SerializationProxy, 
> scala.collection.immutable.List$SerializationProxy@4c7e3aab)
>   - writeReplace data (class: 
> scala.collection.immutable.List$SerializationProxy)
>   - object (class scala.collection.immutable.$colon$colon, 
> List(

[jira] [Updated] (SPARK-12783) Dataset map serialization error

2016-01-24 Thread Muthu Jayakumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Muthu Jayakumar updated SPARK-12783:

Attachment: MyMap.scala

> Dataset map serialization error
> ---
>
> Key: SPARK-12783
> URL: https://issues.apache.org/jira/browse/SPARK-12783
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Muthu Jayakumar
>Assignee: Wenchen Fan
>Priority: Critical
> Attachments: MyMap.scala
>
>
> When Dataset API is used to map to another case class, an error is thrown.
> {code}
> case class MyMap(map: Map[String, String])
> case class TestCaseClass(a: String, b: String){
>   def toMyMap: MyMap = {
> MyMap(Map(a->b))
>   }
>   def toStr: String = {
> a
>   }
> }
> //Main method section below
> import sqlContext.implicits._
> val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), 
> TestCaseClass("2015-05-01", "data2"))).toDF()
> df1.as[TestCaseClass].map(_.toStr).show() //works fine
> df1.as[TestCaseClass].map(_.toMyMap).show() //fails
> {code}
> Error message:
> {quote}
> Caused by: java.io.NotSerializableException: 
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1
> Serialization stack:
>   - object not serializable (class: 
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: 
> package lang)
>   - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: 
> class scala.reflect.internal.Symbols$Symbol)
>   - object (class scala.reflect.internal.Types$UniqueThisType, 
> java.lang.type)
>   - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: 
> class scala.reflect.internal.Types$Type)
>   - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String)
>   - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, 
> type: class scala.reflect.internal.Types$Type)
>   - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String)
>   - field (class: 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, 
> type: class scala.reflect.api.Types$TypeApi)
>   - object (class 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, )
>   - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, 
> name: function, type: interface scala.Function1)
>   - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, 
> mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>  field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType))
>   - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: 
> targetObject, type: class 
> org.apache.spark.sql.catalyst.expressions.Expression)
>   - object (class org.apache.spark.sql.catalyst.expressions.Invoke, 
> invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>  field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>  [Ljava.lang.Object;)))
>   - writeObject data (class: 
> scala.collection.immutable.List$SerializationProxy)
>   - object (class scala.collection.immutable.List$SerializationProxy, 
> scala.collection.immutable.List$SerializationProxy@4c7e3aab)
>   - writeReplace data (class: 
> scala.collection.immutable.List$SerializationProxy)
>   - object (class scala.collection.immutable.$colon$colon, 
> List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>  field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>  [Ljava.lang.Object;)), 
> invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>  field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>  [Ljava.lang.Object;
>   - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, 
> name: arguments, type: interface scala.collection.Seq)
>   - object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, 
> staticinvoke(class 
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface 
> scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>  field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>  
> [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,Ma

[jira] [Commented] (SPARK-12783) Dataset map serialization error

2016-01-15 Thread Muthu Jayakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102482#comment-15102482
 ] 

Muthu Jayakumar commented on SPARK-12783:
-

Hello Kevin,

Here is what I am seeing...

from shell:
{code}
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
  /_/

Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_60)
Type in expressions to have them evaluated.
Type :help for more information.

scala> case class MyMap(map: Map[String, String])
defined class MyMap

scala> :paste
// Entering paste mode (ctrl-D to finish)

case class TestCaseClass(a: String, b: String){
  def toMyMap: MyMap = {
MyMap(Map(a->b))
  }

  def toStr: String = {
a
  }
}

// Exiting paste mode, now interpreting.

defined class TestCaseClass

scala> TestCaseClass("a", "nn")
res4: TestCaseClass = TestCaseClass(a,nn)

scala>   import sqlContext.implicits._
import sqlContext.implicits._

scala> val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", 
"data1"), TestCaseClass("2015-05-01", "data2"))).toDF()
org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner 
class `TestCaseClass` without access to the scope that this class was defined 
in. Try moving this class out of its parent class.;
  at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$2.applyOrElse(ExpressionEncoder.scala:264)
  at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$2.applyOrElse(ExpressionEncoder.scala:260)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:242)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:233)
  at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:260)
  at org.apache.spark.sql.Dataset.(Dataset.scala:78)
  at org.apache.spark.sql.Dataset.(Dataset.scala:89)
  at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:507)
  ... 52 elided
{code}

I do remember seeing the above error stack, if the case class was defined 
inside the scope of an object (For example: If defined inside MyApp like in the 
example below as it becomes an inner class)
>From code, I added an explicit import and eventually changed to use fully 
>qualified class names like below...

{code}
import scala.collection.{Map => ImMap}

case class MyMap(map: ImMap[String, String])

case class TestCaseClass(a: String, b: String){
  def toMyMap: MyMap = {
MyMap(ImMap(a->b))
  }

  def toStr: String = {
a
  }
}

object MyApp extends App { 
 //Get handle to contexts...
 import sqlContext.implicits._
  val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), 
TestCaseClass("2015-05-01", "data2"))).toDF()
  df1.as[TestCaseClass].map(_.toStr).show() //works fine
  df1.as[TestCaseClass].map(_.toMyMap).show() //error
}

{code}

and

{code}
case class MyMap(map: scala.collection.Map[String, String])

case class TestCaseClass(a: String, b: String){
  def toMyMap: MyMap = {
MyMap(scala.collection.Map(a->b))
  }

  def toStr: String = {
a
  }
}

object MyApp extends App { 
 //Get handle to contexts...
 import sqlContext.implicits._
  val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), 
TestCaseClass("2015-05-01", "data2"))).toDF()
  df1.as[TestCaseClass].map(_.toStr).show() //works fine
  df1.as[TestCaseClass].map(_.toMyMap).show() //error
}

{code}

Please advice on what I may be missing. I misread the earlier comment and tried 
to use immutable map incorrectly :(. 

> Dataset map serialization error
> ---
>
> Key: SPARK-12783
> URL: https://issues.apache.org/jira/browse/SPARK-12783
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Muthu Jayakumar
>Assignee: Wenchen Fan
>Priority: Critical
>
> When Dataset API is used to map to another case class, an error is thrown.
> {code}
> case class MyMap(map: Map[String, String])
> case class TestCaseClass(a: String, b: String){
>   def toMyMap: MyMap = {
> MyMap(Map(a->b))
>   }
>   def toStr: String = {
> a
>   }
> }
> //Main method section below
> import sqlContext.implicits._
> val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), 
> TestCaseClass("2015-05-01", "data2"))).toDF()
> df1.as[TestCaseClass].map(_.toStr).show() //works fine
> df1.as[TestCaseClass].map(_.toMyMap).show() //fails
> {code}
> Error message:
> {quote}
> Caus

[jira] [Comment Edited] (SPARK-12783) Dataset map serialization error

2016-01-15 Thread Muthu Jayakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102351#comment-15102351
 ] 

Muthu Jayakumar edited comment on SPARK-12783 at 1/15/16 9:09 PM:
--

I tried the following, but got similar error...

{code}
case class MyMap(map: scala.collection.immutable.Map[String, String])

case class TestCaseClass(a: String, b: String){
  def toMyMap: MyMap = {
MyMap(Map(a->b))
  }


  def toStr: String = {
a
  }
}

//main thread...
val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), 
TestCaseClass("2015-05-01", "data2"))).toDF() 
  df1.as[TestCaseClass].map(_.toStr).show() //works fine
  df1.as[TestCaseClass].map(_.toMyMap).show() //error
  df1.as[TestCaseClass].map(each=> each.a -> each.b).show() //works fine
{code}

{quote}
Serialization stack:
- object not serializable (class: 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: 
package lang)
- field (class: scala.reflect.internal.Types$ThisType, name: sym, type: 
class scala.reflect.internal.Symbols$Symbol)
- object (class scala.reflect.internal.Types$UniqueThisType, 
java.lang.type)
- field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: 
class scala.reflect.internal.Types$Type)
- object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String)
- field (class: scala.reflect.internal.Types$TypeRef, name: normalized, 
type: class scala.reflect.internal.Types$Type)
- object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String)
- field (class: 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, 
type: class scala.reflect.api.Types$TypeApi)
- object (class 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, )
- field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, 
name: function, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.MapObjects, 
mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- 
field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType))
- field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: 
targetObject, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Invoke, 
invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
 field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
 [Ljava.lang.Object;)))
- writeObject data (class: 
scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, 
scala.collection.immutable.List$SerializationProxy@2660f093)
- writeReplace data (class: 
scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, 
List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
 field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
 [Ljava.lang.Object;)), 
invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
 field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
 [Ljava.lang.Object;
- field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, 
name: arguments, type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, 
staticinvoke(class 
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface 
scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
 field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
 
[Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
 field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
 [Ljava.lang.Object;)),true))
- writeObject data (class: 
scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, 
scala.collection.immutable.List$SerializationProxy@72af5ac7)
- writeReplace data (class: 
scala.collection.immutable.List$SerializationProxy)
- object

[jira] [Comment Edited] (SPARK-12783) Dataset map serialization error

2016-01-15 Thread Muthu Jayakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102351#comment-15102351
 ] 

Muthu Jayakumar edited comment on SPARK-12783 at 1/15/16 7:34 PM:
--

I tried the following, but got similar error...

{code}
case class MyMap(map: scala.collection.immutable.Map[String, String])

case class TestCaseClass(a: String, b: String){
  def toMyMap: MyMap = {
MyMap(Map(a->b))
  }


  def toStr: String = {
a
  }
}

//main thread...
val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), 
TestCaseClass("2015-05-01", "data2"))).toDF() 
//.withColumn("swh_date_to_common_request_id_map", f1(col("_1"), 
col("_2"))).drop("_1").drop("_2")
  df1.as[TestCaseClass].map(_.toStr).show() //works fine
  df1.as[TestCaseClass].map(_.toMyMap).show() //error
  df1.as[TestCaseClass].map(each=> each.a -> each.b).show() //works fine
{code}

{quote}
Serialization stack:
- object not serializable (class: 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: 
package lang)
- field (class: scala.reflect.internal.Types$ThisType, name: sym, type: 
class scala.reflect.internal.Symbols$Symbol)
- object (class scala.reflect.internal.Types$UniqueThisType, 
java.lang.type)
- field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: 
class scala.reflect.internal.Types$Type)
- object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String)
- field (class: scala.reflect.internal.Types$TypeRef, name: normalized, 
type: class scala.reflect.internal.Types$Type)
- object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String)
- field (class: 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, 
type: class scala.reflect.api.Types$TypeApi)
- object (class 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, )
- field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, 
name: function, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.MapObjects, 
mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- 
field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType))
- field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: 
targetObject, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Invoke, 
invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
 field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
 [Ljava.lang.Object;)))
- writeObject data (class: 
scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, 
scala.collection.immutable.List$SerializationProxy@2660f093)
- writeReplace data (class: 
scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, 
List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
 field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
 [Ljava.lang.Object;)), 
invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
 field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
 [Ljava.lang.Object;
- field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, 
name: arguments, type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, 
staticinvoke(class 
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface 
scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
 field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
 
[Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
 field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
 [Ljava.lang.Object;)),true))
- writeObject data (class: 
scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, 
scala.collection.immutable.List$SerializationProxy@72af5ac7)
 

[jira] [Commented] (SPARK-12783) Dataset map serialization error

2016-01-15 Thread Muthu Jayakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102351#comment-15102351
 ] 

Muthu Jayakumar commented on SPARK-12783:
-

I tried the following, but got similar error...

{code}
case class MyMap(map: scala.collection.immutable.Map[String, String])

case class TestCaseClass(a: String, b: String){
  def toMyMap: MyMap = {
MyMap(Map(a->b))
  }

  def toStr: String = {
a
  }
}

//main thread...
val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), 
TestCaseClass("2015-05-01", "data2"))).toDF() 
//.withColumn("swh_date_to_common_request_id_map", f1(col("_1"), 
col("_2"))).drop("_1").drop("_2")
  df1.as[TestCaseClass].map(_.toStr).show() //works fine
  df1.as[TestCaseClass].map(_.toMyMap).show() //error
  df1.as[TestCaseClass].map(each=> each.a -> each.b).show() //works fine
{code}

> Dataset map serialization error
> ---
>
> Key: SPARK-12783
> URL: https://issues.apache.org/jira/browse/SPARK-12783
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Muthu Jayakumar
>Assignee: Wenchen Fan
>Priority: Critical
>
> When Dataset API is used to map to another case class, an error is thrown.
> {code}
> case class MyMap(map: Map[String, String])
> case class TestCaseClass(a: String, b: String){
>   def toMyMap: MyMap = {
> MyMap(Map(a->b))
>   }
>   def toStr: String = {
> a
>   }
> }
> //Main method section below
> import sqlContext.implicits._
> val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), 
> TestCaseClass("2015-05-01", "data2"))).toDF()
> df1.as[TestCaseClass].map(_.toStr).show() //works fine
> df1.as[TestCaseClass].map(_.toMyMap).show() //fails
> {code}
> Error message:
> {quote}
> Caused by: java.io.NotSerializableException: 
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1
> Serialization stack:
>   - object not serializable (class: 
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: 
> package lang)
>   - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: 
> class scala.reflect.internal.Symbols$Symbol)
>   - object (class scala.reflect.internal.Types$UniqueThisType, 
> java.lang.type)
>   - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: 
> class scala.reflect.internal.Types$Type)
>   - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String)
>   - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, 
> type: class scala.reflect.internal.Types$Type)
>   - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String)
>   - field (class: 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, 
> type: class scala.reflect.api.Types$TypeApi)
>   - object (class 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, )
>   - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, 
> name: function, type: interface scala.Function1)
>   - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, 
> mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>  field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType))
>   - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: 
> targetObject, type: class 
> org.apache.spark.sql.catalyst.expressions.Expression)
>   - object (class org.apache.spark.sql.catalyst.expressions.Invoke, 
> invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>  field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>  [Ljava.lang.Object;)))
>   - writeObject data (class: 
> scala.collection.immutable.List$SerializationProxy)
>   - object (class scala.collection.immutable.List$SerializationProxy, 
> scala.collection.immutable.List$SerializationProxy@4c7e3aab)
>   - writeReplace data (class: 
> scala.collection.immutable.List$SerializationProxy)
>   - object (class scala.collection.immutable.$colon$colon, 
> List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>  field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>  [Ljava.lang.Object;)), 
> invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>  field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>  [Ljav

[jira] [Created] (SPARK-12787) Dataset to support custom encoder

2016-01-12 Thread Muthu Jayakumar (JIRA)
Muthu Jayakumar created SPARK-12787:
---

 Summary: Dataset to support custom encoder
 Key: SPARK-12787
 URL: https://issues.apache.org/jira/browse/SPARK-12787
 Project: Spark
  Issue Type: New Feature
Affects Versions: 1.6.0
Reporter: Muthu Jayakumar


The current Dataset API allows to be loaded using a case-class that requires 
the the attribute name and types to be match up precisely.
It would be nicer, if a Partial function can be provided as a parameter to 
transform the Dataframe like schema into Dataset. 
Something like...
test_dataframe.as[TestCaseClass](partial_function)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-12783) Dataset map

2016-01-12 Thread Muthu Jayakumar (JIRA)
Muthu Jayakumar created SPARK-12783:
---

 Summary: Dataset map
 Key: SPARK-12783
 URL: https://issues.apache.org/jira/browse/SPARK-12783
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.6.0
Reporter: Muthu Jayakumar


When Dataset API is used to map to another case class, an error is thrown.

{code}
case class MyMap(map: Map[String, String])
case class TestCaseClass(a: String, b: String){
  def toMyMap: MyMap = {
MyMap(Map(a->b))
  }

  def toStr: String = {
a
  }
}
//Main method section below
import sqlContext.implicits._
val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), 
TestCaseClass("2015-05-01", "data2"))).toDF()
df1.as[TestCaseClass].map(_.toStr).show() //works fine
df1.as[TestCaseClass].map(_.toMyMap).show() //fails
{code}

Error message:
{quote}
Caused by: java.io.NotSerializableException: 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1
Serialization stack:
- object not serializable (class: 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: 
package lang)
- field (class: scala.reflect.internal.Types$ThisType, name: sym, type: 
class scala.reflect.internal.Symbols$Symbol)
- object (class scala.reflect.internal.Types$UniqueThisType, 
java.lang.type)
- field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: 
class scala.reflect.internal.Types$Type)
- object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String)
- field (class: scala.reflect.internal.Types$TypeRef, name: normalized, 
type: class scala.reflect.internal.Types$Type)
- object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String)
- field (class: 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, 
type: class scala.reflect.api.Types$TypeApi)
- object (class 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, )
- field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, 
name: function, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.MapObjects, 
mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- 
field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType))
- field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: 
targetObject, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Invoke, 
invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
 field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
 [Ljava.lang.Object;)))
- writeObject data (class: 
scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, 
scala.collection.immutable.List$SerializationProxy@4c7e3aab)
- writeReplace data (class: 
scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, 
List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
 field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
 [Ljava.lang.Object;)), 
invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
 field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
 [Ljava.lang.Object;
- field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, 
name: arguments, type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, 
staticinvoke(class 
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface 
scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
 field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
 
[Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
 field (class: "scala.collection.immutable.Map", name: "map"),- root class: 
"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
 [Ljava.lang.Object;)),true))
- writeObject data (class: 
scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, 
scala.collection.immutable.List$SerializationP