[jira] [Commented] (SPARK-13230) HashMap.merged not working properly with Spark

2016-03-05 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-13230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15181790#comment-15181790
 ] 

Łukasz Gieroń commented on SPARK-13230:
---

A good workaround is to use Kryo serializer. I've checked and the code works 
with Kryo.

I've created a Scala ticket for this issue and a pull request fixing it. With 
any luck, the fix will be included in Scala 2.11.9.
https://issues.scala-lang.org/browse/SI-9687

> HashMap.merged not working properly with Spark
> --
>
> Key: SPARK-13230
> URL: https://issues.apache.org/jira/browse/SPARK-13230
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: Ubuntu 14.04.3, Scala 2.11.7, Spark 1.6.0
>Reporter: Alin Treznai
>
> Using HashMap.merged with Spark fails with NullPointerException.
> {noformat}
> import org.apache.spark.{SparkConf, SparkContext}
> import scala.collection.immutable.HashMap
> object MergeTest {
>   def mergeFn:(HashMap[String, Long], HashMap[String, Long]) => 
> HashMap[String, Long] = {
> case (m1, m2) => m1.merged(m2){ case (x,y) => (x._1, x._2 + y._2) }
>   }
>   def main(args: Array[String]) = {
> val input = Seq(HashMap("A" -> 1L), HashMap("A" -> 2L, "B" -> 
> 3L),HashMap("A" -> 2L, "C" -> 4L))
> val conf = new SparkConf().setAppName("MergeTest").setMaster("local[*]")
> val sc = new SparkContext(conf)
> val result = sc.parallelize(input).reduce(mergeFn)
> println(s"Result=$result")
> sc.stop()
>   }
> }
> {noformat}
> Error message:
> org.apache.spark.SparkDriverExecutionException: Execution error
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1169)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
> at MergeTest$.main(MergeTest.scala:21)
> at MergeTest.main(MergeTest.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> Caused by: java.lang.NullPointerException
> at 
> MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12)
> at 
> MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12)
> at scala.collection.immutable.HashMap$$anon$2.apply(HashMap.scala:148)
> at 
> scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:200)
> at 
> scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:322)
> at 
> scala.collection.immutable.HashMap$HashTrieMap.merge0(HashMap.scala:463)
> at scala.collection.immutable.HashMap.merged(HashMap.scala:117)
> at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:12)
> at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:11)
> at 
> org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1020)
> at 
> org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1017)
> at 
> org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1165)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-

[jira] [Commented] (SPARK-13230) HashMap.merged not working properly with Spark

2016-03-04 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180234#comment-15180234
 ] 

Sean Owen commented on SPARK-13230:
---

Thanks, that's a great analysis. It sounds like we might need to close this as 
a Scala problem, and offer a workaround. For example, it's obviously possible 
to write a little function that accomplishes the same thing, and which I hope 
doesn't depend on serializing the same internal representation.

(PS JIRA does not use markdown. Use pairs of curly braces to {{format as code}}.

> HashMap.merged not working properly with Spark
> --
>
> Key: SPARK-13230
> URL: https://issues.apache.org/jira/browse/SPARK-13230
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: Ubuntu 14.04.3, Scala 2.11.7, Spark 1.6.0
>Reporter: Alin Treznai
>
> Using HashMap.merged with Spark fails with NullPointerException.
> {noformat}
> import org.apache.spark.{SparkConf, SparkContext}
> import scala.collection.immutable.HashMap
> object MergeTest {
>   def mergeFn:(HashMap[String, Long], HashMap[String, Long]) => 
> HashMap[String, Long] = {
> case (m1, m2) => m1.merged(m2){ case (x,y) => (x._1, x._2 + y._2) }
>   }
>   def main(args: Array[String]) = {
> val input = Seq(HashMap("A" -> 1L), HashMap("A" -> 2L, "B" -> 
> 3L),HashMap("A" -> 2L, "C" -> 4L))
> val conf = new SparkConf().setAppName("MergeTest").setMaster("local[*]")
> val sc = new SparkContext(conf)
> val result = sc.parallelize(input).reduce(mergeFn)
> println(s"Result=$result")
> sc.stop()
>   }
> }
> {noformat}
> Error message:
> org.apache.spark.SparkDriverExecutionException: Execution error
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1169)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
> at MergeTest$.main(MergeTest.scala:21)
> at MergeTest.main(MergeTest.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> Caused by: java.lang.NullPointerException
> at 
> MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12)
> at 
> MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12)
> at scala.collection.immutable.HashMap$$anon$2.apply(HashMap.scala:148)
> at 
> scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:200)
> at 
> scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:322)
> at 
> scala.collection.immutable.HashMap$HashTrieMap.merge0(HashMap.scala:463)
> at scala.collection.immutable.HashMap.merged(HashMap.scala:117)
> at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:12)
> at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:11)
> at 
> org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1020)
> at 
> org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1017)
> at 
> org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1165)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(

[jira] [Commented] (SPARK-13230) HashMap.merged not working properly with Spark

2016-03-04 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-13230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180216#comment-15180216
 ] 

Łukasz Gieroń commented on SPARK-13230:
---

The issue here is the bug in Scala library, in deserialization of `HashMap1` 
objects. When they get deserialized, the internal `kv` field does not get 
deserialized (is left `null`), which causes a `NullPointerException` in 
`merged`. I've fixed this is Scala library, and it fixes the issue.
I'm going to open a bug to Scala library and submit a pull request for it, and 
link that ticket here (if it's possible to link between Jiras).

> HashMap.merged not working properly with Spark
> --
>
> Key: SPARK-13230
> URL: https://issues.apache.org/jira/browse/SPARK-13230
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: Ubuntu 14.04.3, Scala 2.11.7, Spark 1.6.0
>Reporter: Alin Treznai
>
> Using HashMap.merged with Spark fails with NullPointerException.
> {noformat}
> import org.apache.spark.{SparkConf, SparkContext}
> import scala.collection.immutable.HashMap
> object MergeTest {
>   def mergeFn:(HashMap[String, Long], HashMap[String, Long]) => 
> HashMap[String, Long] = {
> case (m1, m2) => m1.merged(m2){ case (x,y) => (x._1, x._2 + y._2) }
>   }
>   def main(args: Array[String]) = {
> val input = Seq(HashMap("A" -> 1L), HashMap("A" -> 2L, "B" -> 
> 3L),HashMap("A" -> 2L, "C" -> 4L))
> val conf = new SparkConf().setAppName("MergeTest").setMaster("local[*]")
> val sc = new SparkContext(conf)
> val result = sc.parallelize(input).reduce(mergeFn)
> println(s"Result=$result")
> sc.stop()
>   }
> }
> {noformat}
> Error message:
> org.apache.spark.SparkDriverExecutionException: Execution error
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1169)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
> at MergeTest$.main(MergeTest.scala:21)
> at MergeTest.main(MergeTest.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> Caused by: java.lang.NullPointerException
> at 
> MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12)
> at 
> MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12)
> at scala.collection.immutable.HashMap$$anon$2.apply(HashMap.scala:148)
> at 
> scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:200)
> at 
> scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:322)
> at 
> scala.collection.immutable.HashMap$HashTrieMap.merge0(HashMap.scala:463)
> at scala.collection.immutable.HashMap.merged(HashMap.scala:117)
> at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:12)
> at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:11)
> at 
> org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1020)
> at 
> org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1017)
> at 
> org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1165)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1

[jira] [Commented] (SPARK-13230) HashMap.merged not working properly with Spark

2016-03-02 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15175467#comment-15175467
 ] 

Sean Owen commented on SPARK-13230:
---

Just start working on it, we typically don't give one person ownership over the 
solution by assigning, unless it's really clear they're driving what will be 
the final answer.

> HashMap.merged not working properly with Spark
> --
>
> Key: SPARK-13230
> URL: https://issues.apache.org/jira/browse/SPARK-13230
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: Ubuntu 14.04.3, Scala 2.11.7, Spark 1.6.0
>Reporter: Alin Treznai
>
> Using HashMap.merged with Spark fails with NullPointerException.
> {noformat}
> import org.apache.spark.{SparkConf, SparkContext}
> import scala.collection.immutable.HashMap
> object MergeTest {
>   def mergeFn:(HashMap[String, Long], HashMap[String, Long]) => 
> HashMap[String, Long] = {
> case (m1, m2) => m1.merged(m2){ case (x,y) => (x._1, x._2 + y._2) }
>   }
>   def main(args: Array[String]) = {
> val input = Seq(HashMap("A" -> 1L), HashMap("A" -> 2L, "B" -> 
> 3L),HashMap("A" -> 2L, "C" -> 4L))
> val conf = new SparkConf().setAppName("MergeTest").setMaster("local[*]")
> val sc = new SparkContext(conf)
> val result = sc.parallelize(input).reduce(mergeFn)
> println(s"Result=$result")
> sc.stop()
>   }
> }
> {noformat}
> Error message:
> org.apache.spark.SparkDriverExecutionException: Execution error
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1169)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
> at MergeTest$.main(MergeTest.scala:21)
> at MergeTest.main(MergeTest.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> Caused by: java.lang.NullPointerException
> at 
> MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12)
> at 
> MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12)
> at scala.collection.immutable.HashMap$$anon$2.apply(HashMap.scala:148)
> at 
> scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:200)
> at 
> scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:322)
> at 
> scala.collection.immutable.HashMap$HashTrieMap.merge0(HashMap.scala:463)
> at scala.collection.immutable.HashMap.merged(HashMap.scala:117)
> at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:12)
> at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:11)
> at 
> org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1020)
> at 
> org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1017)
> at 
> org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1165)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.

[jira] [Commented] (SPARK-13230) HashMap.merged not working properly with Spark

2016-03-01 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-13230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15174809#comment-15174809
 ] 

Łukasz Gieroń commented on SPARK-13230:
---

[~srowen] Can you please assign me to this ticket? I have a pretty strong 
suspicion as to what is going on here, but would like to confirm it with scala 
library folks first before I speak here.

> HashMap.merged not working properly with Spark
> --
>
> Key: SPARK-13230
> URL: https://issues.apache.org/jira/browse/SPARK-13230
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: Ubuntu 14.04.3, Scala 2.11.7, Spark 1.6.0
>Reporter: Alin Treznai
>
> Using HashMap.merged with Spark fails with NullPointerException.
> {noformat}
> import org.apache.spark.{SparkConf, SparkContext}
> import scala.collection.immutable.HashMap
> object MergeTest {
>   def mergeFn:(HashMap[String, Long], HashMap[String, Long]) => 
> HashMap[String, Long] = {
> case (m1, m2) => m1.merged(m2){ case (x,y) => (x._1, x._2 + y._2) }
>   }
>   def main(args: Array[String]) = {
> val input = Seq(HashMap("A" -> 1L), HashMap("A" -> 2L, "B" -> 
> 3L),HashMap("A" -> 2L, "C" -> 4L))
> val conf = new SparkConf().setAppName("MergeTest").setMaster("local[*]")
> val sc = new SparkContext(conf)
> val result = sc.parallelize(input).reduce(mergeFn)
> println(s"Result=$result")
> sc.stop()
>   }
> }
> {noformat}
> Error message:
> org.apache.spark.SparkDriverExecutionException: Execution error
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1169)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
> at MergeTest$.main(MergeTest.scala:21)
> at MergeTest.main(MergeTest.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> Caused by: java.lang.NullPointerException
> at 
> MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12)
> at 
> MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12)
> at scala.collection.immutable.HashMap$$anon$2.apply(HashMap.scala:148)
> at 
> scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:200)
> at 
> scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:322)
> at 
> scala.collection.immutable.HashMap$HashTrieMap.merge0(HashMap.scala:463)
> at scala.collection.immutable.HashMap.merged(HashMap.scala:117)
> at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:12)
> at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:11)
> at 
> org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1020)
> at 
> org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1017)
> at 
> org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1165)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issue

[jira] [Commented] (SPARK-13230) HashMap.merged not working properly with Spark

2016-02-07 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15136654#comment-15136654
 ] 

Sean Owen commented on SPARK-13230:
---

That's a weird one. This is a little simpler as 

{code}
sc.parallelize(input).reduce((m1,m2) => m1.merged(m2) { case ((k,v1),(_,v2)) => 
(k, v1+v2) })
{code}

which yields on the driver

{code}
Caused by: scala.MatchError: (null,null) (of class scala.Tuple2)
  at $anonfun$1$$anonfun$apply$1.apply(:28)
  at $anonfun$1$$anonfun$apply$1.apply(:28)
  at scala.collection.immutable.HashMap$$anon$2$$anon$3.apply(HashMap.scala:150)
  at scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:200)
  at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:322)
  at scala.collection.immutable.HashMap$HashMap1.merge0(HashMap.scala:225)
  at scala.collection.immutable.HashMap.merged(HashMap.scala:117)
  at $anonfun$1.apply(:28)
  at $anonfun$1.apply(:28)
  at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:926)
  at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:923)
  at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:57)
  at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1185)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1658)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1620)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1609)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}

It occurs when reducing the partition and I can't figure out how it would be 
fed a null. collecting or taking the RDD is fine. I suspect something strange 
related to the merged method which is only on immutable.HashMap, but there's no 
good reason that would be a problem.

I would suggest trying a different snippet of Scala code to merge the maps for 
now, since that seems to work.

> HashMap.merged not working properly with Spark
> --
>
> Key: SPARK-13230
> URL: https://issues.apache.org/jira/browse/SPARK-13230
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: Ubuntu 14.04.3, Scala 2.11.7, Spark 1.6.0
>Reporter: Alin Treznai
>
> Using HashMap.merged with Spark fails with NullPointerException.
> {noformat}
> import org.apache.spark.{SparkConf, SparkContext}
> import scala.collection.immutable.HashMap
> object MergeTest {
>   def mergeFn:(HashMap[String, Long], HashMap[String, Long]) => 
> HashMap[String, Long] = {
> case (m1, m2) => m1.merged(m2){ case (x,y) => (x._1, x._2 + y._2) }
>   }
>   def main(args: Array[String]) = {
> val input = Seq(HashMap("A" -> 1L), HashMap("A" -> 2L, "B" -> 
> 3L),HashMap("A" -> 2L, "C" -> 4L))
> val conf = new SparkConf().setAppName("MergeTest").setMaster("local[*]")
> val sc = new SparkContext(conf)
> val result = sc.parallelize(input).reduce(mergeFn)
> println(s"Result=$result")
> sc.stop()
>   }
> }
> {noformat}
> Error message:
> org.apache.spark.SparkDriverExecutionException: Execution error
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1169)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
> at MergeTest$.main(MergeTest.scala:21)
> at MergeTest.main(MergeTest.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> Caused by: java.lang.NullPointerException
>