> > The solution is either to add a default case which does nothing, or > probably better to add a .filter such that you filter out anything that's > not a command before matching. >
And you probably want to push down that filter into the cluster -- collecting all of the elements of an RDD only to not use or filter out some of them isn't an efficient usage of expensive (at least in terms of time/performance) network resources. There may also be a good opportunity to use the partial function form of collect to push even more processing into the cluster. On Sun, Jun 8, 2014 at 10:00 AM, Nick Pentreath <nick.pentre...@gmail.com> wrote: > When you use match, the match must be exhaustive. That is, a match error > is thrown if the match fails. > > That's why you usually handle the default case using "case _ => ..." > > Here it looks like your taking the text of all statuses - which means not > all of them will be commands... Which means your match will not be > exhaustive. > > The solution is either to add a default case which does nothing, or > probably better to add a .filter such that you filter out anything that's > not a command before matching. > > Just looking at it again it could also be that you take x => x._2._1 ... > What type is that? Should it not be a Seq if you're joining, in which case > the match will also fail... > > Hope this helps. > — > Sent from Mailbox <https://www.dropbox.com/mailbox> > > > On Sun, Jun 8, 2014 at 6:45 PM, Jeremy Lee <unorthodox.engine...@gmail.com > > wrote: > >> >> I shut down my first (working) cluster and brought up a fresh one... and >> It's been a bit of a horror and I need to sleep now. Should I be worried >> about these errors? Or did I just have the old log4j.config tuned so I >> didn't see them? >> >> I >> >> 14/06/08 16:32:52 ERROR scheduler.JobScheduler: Error running job >> streaming job 1402245172000 ms.2 >> scala.MatchError: 0101-01-10 (of class java.lang.String) >> at >> SimpleApp$$anonfun$6$$anonfun$apply$6.apply(SimpleApp.scala:218) >> at >> SimpleApp$$anonfun$6$$anonfun$apply$6.apply(SimpleApp.scala:217) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at >> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> at SimpleApp$$anonfun$6.apply(SimpleApp.scala:217) >> at SimpleApp$$anonfun$6.apply(SimpleApp.scala:214) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) >> at >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) >> at >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >> at >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >> at scala.util.Try$.apply(Try.scala:161) >> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) >> at >> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:744) >> >> >> The error comes from this code, which seemed like a sensible way to match >> things: >> (The "case cmd_plus(w)" statement is generating the error,) >> >> val cmd_plus = """[+]([\w]+)""".r >> val cmd_minus = """[-]([\w]+)""".r >> // find command user tweets >> val commands = stream.map( >> status => ( status.getUser().getId(), status.getText() ) >> ).foreachRDD(rdd => { >> rdd.join(superusers).map( >> x => x._2._1 >> ).collect().foreach{ cmd => { >> 218: cmd match { >> case cmd_plus(w) => { >> ... >> } case cmd_minus(w) => { ... } } }} }) >> >> It seems a bit excessive for scala to throw exceptions because a regex >> didn't match. Something feels wrong. >> > >