[jira] [Commented] (SPARK-13230) HashMap.merged not working properly with Spark
[ https://issues.apache.org/jira/browse/SPARK-13230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15181790#comment-15181790 ] Łukasz Gieroń commented on SPARK-13230: --- A good workaround is to use Kryo serializer. I've checked and the code works with Kryo. I've created a Scala ticket for this issue and a pull request fixing it. With any luck, the fix will be included in Scala 2.11.9. https://issues.scala-lang.org/browse/SI-9687 > HashMap.merged not working properly with Spark > -- > > Key: SPARK-13230 > URL: https://issues.apache.org/jira/browse/SPARK-13230 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: Ubuntu 14.04.3, Scala 2.11.7, Spark 1.6.0 >Reporter: Alin Treznai > > Using HashMap.merged with Spark fails with NullPointerException. > {noformat} > import org.apache.spark.{SparkConf, SparkContext} > import scala.collection.immutable.HashMap > object MergeTest { > def mergeFn:(HashMap[String, Long], HashMap[String, Long]) => > HashMap[String, Long] = { > case (m1, m2) => m1.merged(m2){ case (x,y) => (x._1, x._2 + y._2) } > } > def main(args: Array[String]) = { > val input = Seq(HashMap("A" -> 1L), HashMap("A" -> 2L, "B" -> > 3L),HashMap("A" -> 2L, "C" -> 4L)) > val conf = new SparkConf().setAppName("MergeTest").setMaster("local[*]") > val sc = new SparkContext(conf) > val result = sc.parallelize(input).reduce(mergeFn) > println(s"Result=$result") > sc.stop() > } > } > {noformat} > Error message: > org.apache.spark.SparkDriverExecutionException: Execution error > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1169) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007) > at MergeTest$.main(MergeTest.scala:21) > at MergeTest.main(MergeTest.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > Caused by: java.lang.NullPointerException > at > MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12) > at > MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12) > at scala.collection.immutable.HashMap$$anon$2.apply(HashMap.scala:148) > at > scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:200) > at > scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:322) > at > scala.collection.immutable.HashMap$HashTrieMap.merge0(HashMap.scala:463) > at scala.collection.immutable.HashMap.merged(HashMap.scala:117) > at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:12) > at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:11) > at > org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1020) > at > org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1017) > at > org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1165) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) -- This message was sent by Atlassian JIRA (v6.3.4#6332) -
[jira] [Commented] (SPARK-13230) HashMap.merged not working properly with Spark
[ https://issues.apache.org/jira/browse/SPARK-13230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180234#comment-15180234 ] Sean Owen commented on SPARK-13230: --- Thanks, that's a great analysis. It sounds like we might need to close this as a Scala problem, and offer a workaround. For example, it's obviously possible to write a little function that accomplishes the same thing, and which I hope doesn't depend on serializing the same internal representation. (PS JIRA does not use markdown. Use pairs of curly braces to {{format as code}}. > HashMap.merged not working properly with Spark > -- > > Key: SPARK-13230 > URL: https://issues.apache.org/jira/browse/SPARK-13230 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: Ubuntu 14.04.3, Scala 2.11.7, Spark 1.6.0 >Reporter: Alin Treznai > > Using HashMap.merged with Spark fails with NullPointerException. > {noformat} > import org.apache.spark.{SparkConf, SparkContext} > import scala.collection.immutable.HashMap > object MergeTest { > def mergeFn:(HashMap[String, Long], HashMap[String, Long]) => > HashMap[String, Long] = { > case (m1, m2) => m1.merged(m2){ case (x,y) => (x._1, x._2 + y._2) } > } > def main(args: Array[String]) = { > val input = Seq(HashMap("A" -> 1L), HashMap("A" -> 2L, "B" -> > 3L),HashMap("A" -> 2L, "C" -> 4L)) > val conf = new SparkConf().setAppName("MergeTest").setMaster("local[*]") > val sc = new SparkContext(conf) > val result = sc.parallelize(input).reduce(mergeFn) > println(s"Result=$result") > sc.stop() > } > } > {noformat} > Error message: > org.apache.spark.SparkDriverExecutionException: Execution error > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1169) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007) > at MergeTest$.main(MergeTest.scala:21) > at MergeTest.main(MergeTest.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > Caused by: java.lang.NullPointerException > at > MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12) > at > MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12) > at scala.collection.immutable.HashMap$$anon$2.apply(HashMap.scala:148) > at > scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:200) > at > scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:322) > at > scala.collection.immutable.HashMap$HashTrieMap.merge0(HashMap.scala:463) > at scala.collection.immutable.HashMap.merged(HashMap.scala:117) > at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:12) > at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:11) > at > org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1020) > at > org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1017) > at > org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1165) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(
[jira] [Commented] (SPARK-13230) HashMap.merged not working properly with Spark
[ https://issues.apache.org/jira/browse/SPARK-13230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180216#comment-15180216 ] Łukasz Gieroń commented on SPARK-13230: --- The issue here is the bug in Scala library, in deserialization of `HashMap1` objects. When they get deserialized, the internal `kv` field does not get deserialized (is left `null`), which causes a `NullPointerException` in `merged`. I've fixed this is Scala library, and it fixes the issue. I'm going to open a bug to Scala library and submit a pull request for it, and link that ticket here (if it's possible to link between Jiras). > HashMap.merged not working properly with Spark > -- > > Key: SPARK-13230 > URL: https://issues.apache.org/jira/browse/SPARK-13230 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: Ubuntu 14.04.3, Scala 2.11.7, Spark 1.6.0 >Reporter: Alin Treznai > > Using HashMap.merged with Spark fails with NullPointerException. > {noformat} > import org.apache.spark.{SparkConf, SparkContext} > import scala.collection.immutable.HashMap > object MergeTest { > def mergeFn:(HashMap[String, Long], HashMap[String, Long]) => > HashMap[String, Long] = { > case (m1, m2) => m1.merged(m2){ case (x,y) => (x._1, x._2 + y._2) } > } > def main(args: Array[String]) = { > val input = Seq(HashMap("A" -> 1L), HashMap("A" -> 2L, "B" -> > 3L),HashMap("A" -> 2L, "C" -> 4L)) > val conf = new SparkConf().setAppName("MergeTest").setMaster("local[*]") > val sc = new SparkContext(conf) > val result = sc.parallelize(input).reduce(mergeFn) > println(s"Result=$result") > sc.stop() > } > } > {noformat} > Error message: > org.apache.spark.SparkDriverExecutionException: Execution error > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1169) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007) > at MergeTest$.main(MergeTest.scala:21) > at MergeTest.main(MergeTest.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > Caused by: java.lang.NullPointerException > at > MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12) > at > MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12) > at scala.collection.immutable.HashMap$$anon$2.apply(HashMap.scala:148) > at > scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:200) > at > scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:322) > at > scala.collection.immutable.HashMap$HashTrieMap.merge0(HashMap.scala:463) > at scala.collection.immutable.HashMap.merged(HashMap.scala:117) > at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:12) > at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:11) > at > org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1020) > at > org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1017) > at > org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1165) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1
[jira] [Commented] (SPARK-13230) HashMap.merged not working properly with Spark
[ https://issues.apache.org/jira/browse/SPARK-13230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15175467#comment-15175467 ] Sean Owen commented on SPARK-13230: --- Just start working on it, we typically don't give one person ownership over the solution by assigning, unless it's really clear they're driving what will be the final answer. > HashMap.merged not working properly with Spark > -- > > Key: SPARK-13230 > URL: https://issues.apache.org/jira/browse/SPARK-13230 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: Ubuntu 14.04.3, Scala 2.11.7, Spark 1.6.0 >Reporter: Alin Treznai > > Using HashMap.merged with Spark fails with NullPointerException. > {noformat} > import org.apache.spark.{SparkConf, SparkContext} > import scala.collection.immutable.HashMap > object MergeTest { > def mergeFn:(HashMap[String, Long], HashMap[String, Long]) => > HashMap[String, Long] = { > case (m1, m2) => m1.merged(m2){ case (x,y) => (x._1, x._2 + y._2) } > } > def main(args: Array[String]) = { > val input = Seq(HashMap("A" -> 1L), HashMap("A" -> 2L, "B" -> > 3L),HashMap("A" -> 2L, "C" -> 4L)) > val conf = new SparkConf().setAppName("MergeTest").setMaster("local[*]") > val sc = new SparkContext(conf) > val result = sc.parallelize(input).reduce(mergeFn) > println(s"Result=$result") > sc.stop() > } > } > {noformat} > Error message: > org.apache.spark.SparkDriverExecutionException: Execution error > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1169) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007) > at MergeTest$.main(MergeTest.scala:21) > at MergeTest.main(MergeTest.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > Caused by: java.lang.NullPointerException > at > MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12) > at > MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12) > at scala.collection.immutable.HashMap$$anon$2.apply(HashMap.scala:148) > at > scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:200) > at > scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:322) > at > scala.collection.immutable.HashMap$HashTrieMap.merge0(HashMap.scala:463) > at scala.collection.immutable.HashMap.merged(HashMap.scala:117) > at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:12) > at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:11) > at > org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1020) > at > org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1017) > at > org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1165) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.
[jira] [Commented] (SPARK-13230) HashMap.merged not working properly with Spark
[ https://issues.apache.org/jira/browse/SPARK-13230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15174809#comment-15174809 ] Łukasz Gieroń commented on SPARK-13230: --- [~srowen] Can you please assign me to this ticket? I have a pretty strong suspicion as to what is going on here, but would like to confirm it with scala library folks first before I speak here. > HashMap.merged not working properly with Spark > -- > > Key: SPARK-13230 > URL: https://issues.apache.org/jira/browse/SPARK-13230 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: Ubuntu 14.04.3, Scala 2.11.7, Spark 1.6.0 >Reporter: Alin Treznai > > Using HashMap.merged with Spark fails with NullPointerException. > {noformat} > import org.apache.spark.{SparkConf, SparkContext} > import scala.collection.immutable.HashMap > object MergeTest { > def mergeFn:(HashMap[String, Long], HashMap[String, Long]) => > HashMap[String, Long] = { > case (m1, m2) => m1.merged(m2){ case (x,y) => (x._1, x._2 + y._2) } > } > def main(args: Array[String]) = { > val input = Seq(HashMap("A" -> 1L), HashMap("A" -> 2L, "B" -> > 3L),HashMap("A" -> 2L, "C" -> 4L)) > val conf = new SparkConf().setAppName("MergeTest").setMaster("local[*]") > val sc = new SparkContext(conf) > val result = sc.parallelize(input).reduce(mergeFn) > println(s"Result=$result") > sc.stop() > } > } > {noformat} > Error message: > org.apache.spark.SparkDriverExecutionException: Execution error > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1169) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007) > at MergeTest$.main(MergeTest.scala:21) > at MergeTest.main(MergeTest.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > Caused by: java.lang.NullPointerException > at > MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12) > at > MergeTest$$anonfun$mergeFn$1$$anonfun$apply$1.apply(MergeTest.scala:12) > at scala.collection.immutable.HashMap$$anon$2.apply(HashMap.scala:148) > at > scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:200) > at > scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:322) > at > scala.collection.immutable.HashMap$HashTrieMap.merge0(HashMap.scala:463) > at scala.collection.immutable.HashMap.merged(HashMap.scala:117) > at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:12) > at MergeTest$$anonfun$mergeFn$1.apply(MergeTest.scala:11) > at > org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1020) > at > org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1017) > at > org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1165) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issue
[jira] [Commented] (SPARK-13230) HashMap.merged not working properly with Spark
[ https://issues.apache.org/jira/browse/SPARK-13230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15136654#comment-15136654 ] Sean Owen commented on SPARK-13230: --- That's a weird one. This is a little simpler as {code} sc.parallelize(input).reduce((m1,m2) => m1.merged(m2) { case ((k,v1),(_,v2)) => (k, v1+v2) }) {code} which yields on the driver {code} Caused by: scala.MatchError: (null,null) (of class scala.Tuple2) at $anonfun$1$$anonfun$apply$1.apply(:28) at $anonfun$1$$anonfun$apply$1.apply(:28) at scala.collection.immutable.HashMap$$anon$2$$anon$3.apply(HashMap.scala:150) at scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:200) at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:322) at scala.collection.immutable.HashMap$HashMap1.merge0(HashMap.scala:225) at scala.collection.immutable.HashMap.merged(HashMap.scala:117) at $anonfun$1.apply(:28) at $anonfun$1.apply(:28) at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:926) at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:923) at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:57) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1658) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1620) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1609) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) {code} It occurs when reducing the partition and I can't figure out how it would be fed a null. collecting or taking the RDD is fine. I suspect something strange related to the merged method which is only on immutable.HashMap, but there's no good reason that would be a problem. I would suggest trying a different snippet of Scala code to merge the maps for now, since that seems to work. > HashMap.merged not working properly with Spark > -- > > Key: SPARK-13230 > URL: https://issues.apache.org/jira/browse/SPARK-13230 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: Ubuntu 14.04.3, Scala 2.11.7, Spark 1.6.0 >Reporter: Alin Treznai > > Using HashMap.merged with Spark fails with NullPointerException. > {noformat} > import org.apache.spark.{SparkConf, SparkContext} > import scala.collection.immutable.HashMap > object MergeTest { > def mergeFn:(HashMap[String, Long], HashMap[String, Long]) => > HashMap[String, Long] = { > case (m1, m2) => m1.merged(m2){ case (x,y) => (x._1, x._2 + y._2) } > } > def main(args: Array[String]) = { > val input = Seq(HashMap("A" -> 1L), HashMap("A" -> 2L, "B" -> > 3L),HashMap("A" -> 2L, "C" -> 4L)) > val conf = new SparkConf().setAppName("MergeTest").setMaster("local[*]") > val sc = new SparkContext(conf) > val result = sc.parallelize(input).reduce(mergeFn) > println(s"Result=$result") > sc.stop() > } > } > {noformat} > Error message: > org.apache.spark.SparkDriverExecutionException: Execution error > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1169) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007) > at MergeTest$.main(MergeTest.scala:21) > at MergeTest.main(MergeTest.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > Caused by: java.lang.NullPointerException >