Re: Java heap space and spark.akka.frameSize Inbox x
Can anybody help me? Thanks. Chieh-Yen On Wed, Apr 16, 2014 at 5:18 PM, Chieh-Yen r01944...@csie.ntu.edu.twwrote: Dear all, I developed a application that the message size of communication is greater than 10 MB sometimes. For smaller datasets it works fine, but fails for larger datasets. Please check the error message following. I surveyed the situation online and lots of people said the problem can be solved by modifying the property of spark.akka.frameSize and spark.reducer.maxMbInFlight. It may look like: 134 val conf = new SparkConf() 135 .setMaster(master) 136 .setAppName(SparkLR) 137 .setSparkHome(/home/user/spark-0.9.0-incubating-bin-hadoop2) 138 .setJars(List(jarPath)) 139 .set(spark.akka.frameSize, 100) 140 .set(spark.reducer.maxMbInFlight, 100) 141 val sc = new SparkContext(conf) However, the task still fails with the same error message. The communication message is the weight vectors of each sub-problem, it may be larger than 10 MB for higher dimensional dataset. Is there anybody can help me? Thanks a lot. [error] (run-main) org.apache.spark.SparkException: Job aborted: Exception while deserializing and fetching task:*java.lang.OutOfMemoryError: Java heap space* org.apache.spark.SparkException: Job aborted: Exception while deserializing and fetching task: java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.orghttp://org.apache.spark.scheduler.dagscheduler.org/ $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [trace] Stack trace suppressed: run last compile:run for the full output. Chieh-Yen
Re: Java heap space and spark.akka.frameSize Inbox x
Hi Chieh, You can increase the heap size by exporting the java options (See below, will increase the heap size to 10Gb) export _JAVA_OPTIONS=-Xmx10g On Mon, Apr 21, 2014 at 11:43 AM, Chieh-Yen r01944...@csie.ntu.edu.twwrote: Can anybody help me? Thanks. Chieh-Yen On Wed, Apr 16, 2014 at 5:18 PM, Chieh-Yen r01944...@csie.ntu.edu.twwrote: Dear all, I developed a application that the message size of communication is greater than 10 MB sometimes. For smaller datasets it works fine, but fails for larger datasets. Please check the error message following. I surveyed the situation online and lots of people said the problem can be solved by modifying the property of spark.akka.frameSize and spark.reducer.maxMbInFlight. It may look like: 134 val conf = new SparkConf() 135 .setMaster(master) 136 .setAppName(SparkLR) 137 .setSparkHome(/home/user/spark-0.9.0-incubating-bin-hadoop2) 138 .setJars(List(jarPath)) 139 .set(spark.akka.frameSize, 100) 140 .set(spark.reducer.maxMbInFlight, 100) 141 val sc = new SparkContext(conf) However, the task still fails with the same error message. The communication message is the weight vectors of each sub-problem, it may be larger than 10 MB for higher dimensional dataset. Is there anybody can help me? Thanks a lot. [error] (run-main) org.apache.spark.SparkException: Job aborted: Exception while deserializing and fetching task:*java.lang.OutOfMemoryError: Java heap space* org.apache.spark.SparkException: Job aborted: Exception while deserializing and fetching task: java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.orghttp://org.apache.spark.scheduler.dagscheduler.org/ $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [trace] Stack trace suppressed: run last compile:run for the full output. Chieh-Yen -- Thanks Best Regards
Re: running tests selectively
You should add the hub command line wrapper of git for github to that wiki page: https://github.com/github/hub -- doesn't look like I have edit access to the wiki, or I've forgotten a password, or something Once you've got hub installed and aliased, you've got some nice additional options, such as checking out a PR with just `git checkout https://github.com/apache/spark/pull/457`, for example. On Sun, Apr 20, 2014 at 9:01 PM, Patrick Wendell pwend...@gmail.com wrote: I put some notes in this doc: https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools On Sun, Apr 20, 2014 at 8:58 PM, Arun Ramakrishnan sinchronized.a...@gmail.com wrote: I would like to run some of the tests selectively. I am in branch-1.0 Tried the following two commands. But, it seems to run everything. ./sbt/sbt testOnly org.apache.spark.rdd.RDDSuite ./sbt/sbt test-only org.apache.spark.rdd.RDDSuite Also, how do I run tests of only one of the subprojects. I tried $cd core; sbt test But, things fail to compile properly. thanks Arun
Re: Task splitting among workers
1.) How about if data is in S3 and we cached in memory , instead of hdfs ? 2.) How is the numbers of reducers determined in both case . Even if I specify set.mapred.reduce.tasks=50, still somehow reducers allocated are only 2, instead of 50. Although query/tasks gets completed. Regards, Arpit On Mon, Apr 21, 2014 at 9:33 AM, Patrick Wendell pwend...@gmail.com wrote: For a HadoopRDD, first the spark scheduler calculates the number of tasks based on input splits. Usually people use this with HDFS data so in that case it's based on HDFS blocks. If the HDFS datanodes are co-located with the Spark cluster then it will try to run the tasks on the data node that contains its input to achieve higher throughput. Otherwise, all of the nodes are considered equally fit to run any task, and Spark just load balances across them. On Sat, Apr 19, 2014 at 9:25 PM, David Thomas dt5434...@gmail.com wrote: During a Spark stage, how are tasks split among the workers? Specifically for a HadoopRDD, who determines which worker has to get which task?
Re: Are there any plans to develop Graphx Streaming?
On Sun, Apr 20, 2014 at 6:27 PM, Qi Song songqi1...@gmail.com wrote: I wander if there exists some documentation about how to choose partition methods, based on the graph's structure or some other properties? The best option is to try all the partition strategies (as well as the default, which is to leave edges in their original partitions) and see which one works best for your particular graph. Here are some ideas about what might work best: - The edge list sometimes comes in a meaningful order by default. For example, a web graph might have edges clustered by domain, and other graphs might have the edge list pre-sorted (i.e., clustered by source vertex ID). In these cases, leaving the edges in the original partitions might provide the best performance. - EdgePartition2D should theoretically be the best strategy in general, but if the number of machines is not a perfect square, it might cause work imbalance (see the Scaladoc for PartitionStrategy.EdgePartition2D). In this case, RandomVertexCut might be better, since it optimizes work balance but completely ignores communication. - For an algorithm where messages are sent backwards along each edge (i.e., to the source of the edge), EdgePartition1D will minimize the communication needed to update each vertex with the aggregated result of mapReduceTriplets. If we had a DestinationEdgePartition1D, a similar statement would be true for algorithms like PageRank where messages are sent forwards along each edge. Ankur http://www.ankurdave.com/
Re: [GraphX] Cast error when comparing a vertex attribute after its type has changed
On Fri, Apr 11, 2014 at 4:42 AM, Pierre-Alexandre Fonta pierre.alexandre.fonta+sp...@gmail.com wrote: Testing in mapTriplets if a vertex attribute, which is defined as Integer in first VertexRDD but has been changed after to Double by mapVertices, is greater than a number throws java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Double. This is a bug! Thanks for reporting it. I just filed SPARK-1552https://issues.apache.org/jira/browse/SPARK-1552to track the fix. Ankur http://www.ankurdave.com/
Re: Do I need to learn Scala for spark ?
Hi, I think you can do just fine with your Java knowledge. There is a Java API that you can use [1]. I am also new to Spark and i have got around with just my Java knowledge. And Scala is easy to learn if you are good with Java. [1] http://spark.apache.org/docs/latest/java-programming-guide.html Regards, Pulasthi On Mon, Apr 21, 2014 at 7:10 PM, arpan57 arpanraj...@gmail.com wrote: Hi guys, I read Spark is pretty faster than Hadoop and that inspires me to learn it. I've hands on exp. with Hadoop (MR-1). And pretty good with java programming. Do I need to learn Scala in order to learn Spark ? Can I go ahead and write my jobs in Java and run on spark ? How much dependency is there on Scala to learn spark ? Thanks in advance. Regards, Arpan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Do-I-need-to-learn-Scala-for-spark-tp4528.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Pulasthi Supun Undergraduate Dpt of Computer Science Engineering University of Moratuwa Blog : http://pulasthisupun.blogspot.com/ Git hub profile: http://pulasthisupun.blogspot.com/https://github.com/pulasthi https://github.com/pulasthi
custom kryoserializer class under mesos
Hello, Is it possible to use a custom class as my spark's KryoSerializer running under Mesos? I've tried adding my jar containing the class to my spark context (via SparkConf.addJars), but I always get: java.lang.ClassNotFoundException: flambo.kryo.FlamboKryoSerializer at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.SerializerManager.get(SerializerManager.scala:56) at org.apache.spark.serializer.SerializerManager.setDefault(SerializerManager.scala:38) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:146) at org.apache.spark.executor.Executor.init(Executor.scala:110) at org.apache.spark.executor.MesosExecutorBackend.registered(MesosExecutorBackend.scala:58) Exception in thread Thread-0 Do I need to include this jar containing my serializer class in my make-distribution executor tgz or something? Thanks
Re: SPARK_YARN_APP_JAR, SPARK_CLASSPATH and ADD_JARS in a spark-shell on YARN
Hi Christophe, Adding the jars to both SPARK_CLASSPATH and ADD_JARS is required. The former makes them available to the spark-shell driver process, and the latter tells Spark to make them available to the executor processes running on the cluster. -Sandy On Wed, Apr 16, 2014 at 9:27 AM, Christophe Préaud christophe.pre...@kelkoo.com wrote: Hi, I am running Spark 0.9.1 on a YARN cluster, and I am wondering which is the correct way to add external jars when running a spark shell on a YARN cluster. Packaging all this dependencies in an assembly which path is then set in SPARK_YARN_APP_JAR (as written in the doc: http://spark.apache.org/docs/latest/running-on-yarn.html) does not work in my case: it pushes the jar on HDFS in .sparkStaging/application_XXX, but the spark-shell is still unable to find it (unless ADD_JARS and/or SPARK_CLASSPATH is defined) Defining all the dependencies (either in an assembly, or separately) in ADD_JARS or SPARK_CLASSPATH works (even if SPARK_YARN_APP_JAR is set to /dev/null), but defining some dependencies in ADD_JARS and the rest in SPARK_CLASSPATH does not! Hence I'm still wondering which are the differences between ADD_JARS and SPARK_CLASSPATH, and the purpose of SPARK_YARN_APP_JAR. Thanks for any insights! Christophe. Kelkoo SAS Société par Actions Simplifiée Au capital de EURO 4.168.964,30 Siège social : 8, rue du Sentier 75002 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur.
Re: Do developers have to be aware of Spark's fault tolerance mechanism?
Hi Sung, On Mon, Apr 21, 2014 at 10:52 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: The goal is to keep an intermediate value per row in memory, which would allow faster subsequent computations. I.e., computeSomething would depend on the previous value from the previous computation. I think the fundamental problem here is that there is no in memory state of the sort you mention when you're talking about map/reduce-style workloads. There are three kinds of data that you can use to communicate between sub-tasks: - RDD input / output, i.e. the arguments and return values of the closures you pass to transformations - Broadcast variables - Accumulators In general, distributed algorithms should strive to be stateless, exactly because of issues like reliability and having to re-run computations (and communication/coordination in general being expensive). The last two in the list above are not generally targeted at the kind of state-keeping that you seem to be talking about. So if you make the result of computeSomething() the output of your map task, then you'll have access to it in the operations downstream from that map task. But you can't store it in a variable in memory and access it later, because that's not how the system works. In any case, I'm really not familiar with ML algorithms, but maybe you should take a look at MLLib. -- Marcelo
spark-0.9.1 compiled with Hadoop 2.3.0 doesn't work with S3?
Hi, all I’m writing a Spark application to load S3 data to HDFS, the HDFS version is 2.3.0, so I have to compile Spark with Hadoop 2.3.0 after I execute val allfiles = sc.textFile(s3n://abc/*.txt”) val output = allfiles.saveAsTextFile(hdfs://x.x.x.x:9000/dataset”) Spark throws exception: (actually related to Hadoop?) java.lang.NoClassDefFoundError: org/jets3t/service/ServiceException at org.apache.hadoop.fs.s3.S3FileSystem.createDefaultStore(S3FileSystem.java:100) at org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:90) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2350) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2332) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:369) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.SparkContext.runJob(SparkContext.scala:891) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:741) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:692) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:574) at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:900) at $iwC$$iwC$$iwC$$iwC.init(console:14) at $iwC$$iwC$$iwC.init(console:19) at $iwC$$iwC.init(console:21) at $iwC.init(console:23) at init(console:25) at .init(console:29) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:793) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:838) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:750) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:598) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:605) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:608) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:931) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:881) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:881) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:881) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:973) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) Caused by: java.lang.ClassNotFoundException: org.jets3t.service.ServiceException at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 63 more Anyone else met the similar problem? Best, -- Nan Zhu
checkpointing without streaming?
I'm trying to understand when I would want to checkpoint an RDD rather than just persist to disk. Every reference I can find to checkpoint related to Spark Streaming. But the method is defined in the core Spark library, not Streaming. Does it exist solely for streaming, or are there circumstances unrelated to streaming in which I might want to checkpoint...and if so, like what? Thanks, Diana
Re: checkpointing without streaming?
Checkpoint clears dependencies. You might need checkpoint to cut a long lineage in iterative algorithms. -Xiangrui On Mon, Apr 21, 2014 at 11:34 AM, Diana Carroll dcarr...@cloudera.com wrote: I'm trying to understand when I would want to checkpoint an RDD rather than just persist to disk. Every reference I can find to checkpoint related to Spark Streaming. But the method is defined in the core Spark library, not Streaming. Does it exist solely for streaming, or are there circumstances unrelated to streaming in which I might want to checkpoint...and if so, like what? Thanks, Diana
Re: Spark is slow
Hi Joe, On Mon, Apr 21, 2014 at 11:23 AM, Joe L selme...@yahoo.com wrote: And, I haven't gotten any answers to my questions. One thing that might explain that is that, at least for me, all (and I mean *all*) of your messages are ending up in my GMail spam folder, complaining that GMail can't verify that it really comes from yahoo.com. No idea why that's happening or how to fix it. -- Marcelo
Re: Do developers have to be aware of Spark's fault tolerance mechanism?
I would probably agree that it's typically not a good idea to add states to distributed systems. Additionally, from a purist's perspective, this would be a bit of hacking to the paradigm. However, from a practical point of view, I think that it's a reasonable trade-off between efficiency and complexity. It's not too difficult to have a small set of mutable states being kept in-between iterations. And I think a laarge number of iterative algorithms could benefit from this. For the time being, we're thinking something like this: RDD[Array[Double]] appended with an extra column that initializes to some default value. If the extra column in an iteration has the default value, it means either something failed or it's the very first iteration, so we compute things inefficiently. Otherwise, it has intermediate computational value, so we can do efficient computation. On Mon, Apr 21, 2014 at 11:15 AM, Marcelo Vanzin van...@cloudera.comwrote: Hi Sung, On Mon, Apr 21, 2014 at 10:52 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: The goal is to keep an intermediate value per row in memory, which would allow faster subsequent computations. I.e., computeSomething would depend on the previous value from the previous computation. I think the fundamental problem here is that there is no in memory state of the sort you mention when you're talking about map/reduce-style workloads. There are three kinds of data that you can use to communicate between sub-tasks: - RDD input / output, i.e. the arguments and return values of the closures you pass to transformations - Broadcast variables - Accumulators In general, distributed algorithms should strive to be stateless, exactly because of issues like reliability and having to re-run computations (and communication/coordination in general being expensive). The last two in the list above are not generally targeted at the kind of state-keeping that you seem to be talking about. So if you make the result of computeSomething() the output of your map task, then you'll have access to it in the operations downstream from that map task. But you can't store it in a variable in memory and access it later, because that's not how the system works. In any case, I'm really not familiar with ML algorithms, but maybe you should take a look at MLLib. -- Marcelo
Re: Spark is slow
Yahoo made some changes that drive mailing list posts into spam folders: http://www.virusbtn.com/blog/2014/04_15.xml On Mon, Apr 21, 2014 at 2:50 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Joe, On Mon, Apr 21, 2014 at 11:23 AM, Joe L selme...@yahoo.com wrote: And, I haven't gotten any answers to my questions. One thing that might explain that is that, at least for me, all (and I mean *all*) of your messages are ending up in my GMail spam folder, complaining that GMail can't verify that it really comes from yahoo.com. No idea why that's happening or how to fix it. -- Marcelo
Problem connecting to HDFS in Spark shell
I'm trying to get my feet wet with Spark. I've done some simple stuff in the shell in standalone mode, and now I'm trying to connect to HDFS resources, but I'm running into a problem. I synced to git's master branch (c399baa - SPARK-1456 Remove view bounds on Ordered in favor of a context bound on Ordering. (3 days ago) Michael Armbrust and built like so: SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly This created various jars in various places, including these (I think): ./examples/target/scala-2.10/spark-examples-assembly-1.0.0-SNAPSHOT.jar ./tools/target/scala-2.10/spark-tools-assembly-1.0.0-SNAPSHOT.jar ./assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop2.2.0.jar In `conf/spark-env.sh`, I added this (actually before I did the assembly): export HADOOP_CONF_DIR=/etc/hadoop/conf Now I fire up the shell (bin/spark-shell) and try to grab data from HFDS, and get the following exception: scala var hdf = sc.hadoopFile(hdfs:///user/kwilliams/dat/part-m-0) hdf: org.apache.spark.rdd.RDD[(Nothing, Nothing)] = HadoopRDD[0] at hadoopFile at console:12 scala hdf.count() java.lang.RuntimeException: java.lang.InstantiationException at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131) at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:155) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:168) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:207) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1064) at org.apache.spark.rdd.RDD.count(RDD.scala:806) at $iwC$$iwC$$iwC$$iwC.init(console:15) at $iwC$$iwC$$iwC.init(console:20) at $iwC$$iwC.init(console:22) at $iwC.init(console:24) at init(console:26) at .init(console:30) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:777) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1045) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) Caused by: java.lang.InstantiationException at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:129) ... 41 more Is this recognizable to anyone as a build problem, or a config problem, or anything? Failing that, any way to get more information about where in the process it's failing? Thanks. -- Ken Williams, Senior Research Scientist WindLogics http://windlogics.com CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the intended recipient(s) and may contain confidential and privileged information. Any unauthorized review, use, disclosure or distribution of any kind is strictly prohibited. If you are not the intended recipient, please contact the
Re: checkpointing without streaming?
When might that be necessary or useful? Presumably I can persist and replicate my RDD to avoid re-computation, if that's my goal. What advantage does checkpointing provide over disk persistence with replication? On Mon, Apr 21, 2014 at 2:42 PM, Xiangrui Meng men...@gmail.com wrote: Checkpoint clears dependencies. You might need checkpoint to cut a long lineage in iterative algorithms. -Xiangrui On Mon, Apr 21, 2014 at 11:34 AM, Diana Carroll dcarr...@cloudera.com wrote: I'm trying to understand when I would want to checkpoint an RDD rather than just persist to disk. Every reference I can find to checkpoint related to Spark Streaming. But the method is defined in the core Spark library, not Streaming. Does it exist solely for streaming, or are there circumstances unrelated to streaming in which I might want to checkpoint...and if so, like what? Thanks, Diana
Re: Spark is slow
I'm seeing the same thing as Marcelo, Joe. All your mail is going to my Spam folder. :( With regards to your questions, I would suggest in general adding some more technical detail to them. It will be difficult for people to give you suggestions if all they are told is Spark is slow. How does your Spark setup differ from your MR/Hive setup? What operations are you doing? What do you see in the Spark UI? What have you tried doing to isolate or identify the reason for the slowness? Etc. Nick On Mon, Apr 21, 2014 at 2:50 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Joe, On Mon, Apr 21, 2014 at 11:23 AM, Joe L selme...@yahoo.com wrote: And, I haven't gotten any answers to my questions. One thing that might explain that is that, at least for me, all (and I mean *all*) of your messages are ending up in my GMail spam folder, complaining that GMail can't verify that it really comes from yahoo.com. No idea why that's happening or how to fix it. -- Marcelo
Re: Spark is slow
Why don't start by explaining what kind of operation you're running on spark that's faster than hadoop mapred. Mybewe could start there. And yes this mailing is very busy since many people are getting into Spark, it's hard to answer to everyone. On 21 Apr 2014 20:23, Joe L selme...@yahoo.com wrote: It is claimed that spark is 10x or 100x times faster than mapreduce and hive but since I started using it I haven't seen any faster performance. it is taking 2 minutes to run map and join tasks over just 2GB data. Instead hive was taking just a few seconds to join 2 tables over the same data. And, I haven't gotten any answers to my questions. I don't understand the purpose of this group and there is no enough documentations of spark and its usage. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-slow-tp4539.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Streaming source from Amazon Kinesis
I'm looking to start experimenting with Spark Streaming, and I'd like to use Amazon Kinesis https://aws.amazon.com/kinesis/ as my data source. Looking at the list of supported Spark Streaming sourceshttp://spark.apache.org/docs/latest/streaming-programming-guide.html#linking, I don't see any mention of Kinesis. Is it possible to use Spark Streaming with Amazon Kinesis? If not, are there plans to add such support in the future? Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-source-from-Amazon-Kinesis-tp4550.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: BFS implemented
would be good if you can contribute this as an example. BFS is a common enough algo. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, Apr 19, 2014 at 4:16 AM, Ghufran Malik gooksma...@gmail.com wrote: Ahh nvm I found the solution :) triplet.srcAttr != Double.PositiveInfinity triplet.dstAttr == Double.PositiveInfinity as my new if condition. -- Forwarded message -- From: Ghufran Malik gooksma...@gmail.com Date: 18 April 2014 23:15 Subject: BFS implemented To: user@spark.apache.org Hi I have sucessfully implemented the Breadth First Search algorithm using the Pregel operator in graphX as follows: val graph = GraphLoader.edgeListFile(sc, graphx/data/test_graph.txt) val root: VertexId = 1 val initialGraph = graph.mapVertices((id, _) = if (id == root) 0.0 else Double.PositiveInfinity) val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)( (id, attr, msg) = math.min(attr, msg), triplet = { if (triplet.srcAttr != Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr+1)) } else { Iterator.empty } }, (a,b) = math.min(a,b) ) println(bfs.vertices.collect.mkString(\n)) where the test_graph.txt is: 1 2 2 1 2 3 2 4 3 2 3 3 4 2 4 3 and the result outputted after I run my algorithm is: (4,2.0) (2,1.0) (3,2.0) (1,0.0) which is the correct result. I was hoping someone could improve upon my implementation by suggesting a way in which I do not need the max iteration number (20). If I remove this my job will continue on for sometime until eventual I receive the error: 7) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at carries on and on. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) 14/04/18 23:11:14 ERROR TaskSetManager: Task 81094.0:0 failed 1 times; aborting job 14/04/18 23:11:14 INFO DAGScheduler: Failed to run reduce at VertexRDD.scala:91 14/04/18 23:11:14 INFO TaskSchedulerImpl: Remove TaskSet 81094.0 from pool org.apache.spark.SparkException: Job aborted: Task 81094.0:0 failed 1 times (most recent failure: Exception failure: java.lang.StackOverflowError) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Re: Spark Streaming source from Amazon Kinesis
There was a patch posted a few weeks ago (https://github.com/apache/spark/pull/223), but it needs a few changes in packaging because it uses a license that isn’t fully compatible with Apache. I’d like to get this merged when the changes are made though — it would be a good input source to support. Matei On Apr 21, 2014, at 1:00 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I'm looking to start experimenting with Spark Streaming, and I'd like to use Amazon Kinesis as my data source. Looking at the list of supported Spark Streaming sources, I don't see any mention of Kinesis. Is it possible to use Spark Streaming with Amazon Kinesis? If not, are there plans to add such support in the future? Nick View this message in context: Spark Streaming source from Amazon Kinesis Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Strange behaviour of different SSCs with same Kafka topic
Are you by any chance starting two StreamingContexts in the same JVM? That could explain a lot of the weird mixing of data that you are seeing. Its not a supported usage scenario to start multiple streamingContexts simultaneously in the same JVM. TD On Thu, Apr 17, 2014 at 10:58 PM, gaganbm gagan.mis...@gmail.com wrote: It happens with normal data rate, i.e., lets say 20 records per second. Apart from that, I am also getting some more strange behavior. Let me explain. I establish two sscs. Start them one after another. In SSCs I get the streams from Kafka sources, and do some manipulations. Like, adding some Record_Name for example, to each of the incoming records. Now this Record_Name is different for both the SSCs, and I get this field from some other class, not relevant to the streams. Now, expected behavior should be, all records in SSC1 gets added with the field RECORD_NAME_1 and all records in SSC2 should get added with the field RECORD_NAME_2. Both the SSCs have nothing to do with each other as I believe. However, strangely enough, I find many records in SSC1 get added with RECORD_NAME_2 and vice versa. Is it some kind of serialization issue ? That, the class which provides this RECORD_NAME gets serialized and is reconstructed and then some weird thing happens inside ? I am unable to figure out. So, apart from skewed frequency and volume of records in both the streams, I am getting this inter-mingling of data among the streams. Can you help me in how to use some external data to manipulate the RDD records ? Thanks and regards Gagan B Mishra *Programmer* *560034, Bangalore* *India* On Tue, Apr 15, 2014 at 4:09 AM, Tathagata Das [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=4434i=0wrote: Does this happen at low event rate for that topic as well, or only for a high volume rate? TD On Wed, Apr 9, 2014 at 11:24 PM, gaganbm [hidden email]http://user/SendEmail.jtp?type=nodenode=4238i=0 wrote: I am really at my wits' end here. I have different Streaming contexts, lets say 2, and both listening to same Kafka topics. I establish the KafkaStream by setting different consumer groups to each of them. Ideally, I should be seeing the kafka events in both the streams. But what I am getting is really unpredictable. Only one stream gets a lot of events and the other one almost gets nothing or very less compared to the other. Also the frequency is very skewed. I get a lot of events in one stream continuously, and after some duration I get a few events in the other one. I don't know where I am going wrong. I can see consumer fetcher threads for both the streams that listen to the Kafka topics. I can give further details if needed. Any help will be great. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behaviour-of-different-SSCs-with-same-Kafka-topic-tp4050.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behaviour-of-different-SSCs-with-same-Kafka-topic-tp4050p4238.html To start a new topic under Apache Spark User List, email [hidden email]http://user/SendEmail.jtp?type=nodenode=4434i=1 To unsubscribe from Apache Spark User List, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Strange behaviour of different SSCs with same Kafka topichttp://apache-spark-user-list.1001560.n3.nabble.com/Strange-behaviour-of-different-SSCs-with-same-Kafka-topic-tp4050p4434.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: question about the SocketReceiver
As long as the socket server sends data through the same connection, the existing code is going to work. The socket.getInputStream returns a input stream which will continuously allow you to pull data sent over the connection. The bytesToObject function continuously reads data from the input stream and coverts them to objects. This will continue until the input stream is closed (i.e., the connection is closed). TD On Sun, Apr 20, 2014 at 1:36 AM, YouPeng Yang yypvsxf19870...@gmail.comwrote: Hi I am studing the structure of the Spark Streaming(my spark version is 0.9.0). I have a question about the SocketReceiver.In the onStart function: --- protected def onStart() { logInfo(Connecting to + host + : + port) val socket = new Socket(host, port) logInfo(Connected to + host + : + port) blockGenerator.start() val iterator = bytesToObjects(socket.getInputStream()) while(iterator.hasNext) { val obj = iterator.next blockGenerator += obj } } - Here the Socket client is created and read data iteratively. My question is the onStart function is only called once by the super class NetworkReceiver, and correspondingly read data one time. When the socket server send data again, how does the SocketReceiver read the input data,I can find any src hint about the process. In my opinion, the Socket instance should read the data cyclically as following: - InputStream is = socket.getInputStream() while(theEndflag){ if(is.avariable = 0){ val iterator = bytesToObjects(is) while(iterator.hasNext) { val obj = iterator.next blockGenerator += obj } } } //theEndflag is the end flag of the loop,shoud be set to false when needed. --- I know it may not be the right thought,however i am really curious about the Socket read process because it hard to understand. Any suggestions will be appreciated.
RE: Problem connecting to HDFS in Spark shell
I figured it out - I should be using textFile(...), not hadoopFile(...). And my HDFS URL should include the host: hdfs://host/user/kwilliams/corTable2/part-m-0 I haven't figured out how to let the hostname default to the host mentioned in our /etc/hadoop/conf/hdfs-site.xml like the Hadoop command-line tools do, but that's not so important. -Ken -Original Message- From: Williams, Ken [mailto:ken.willi...@windlogics.com] Sent: Monday, April 21, 2014 2:04 PM To: Spark list Subject: Problem connecting to HDFS in Spark shell I'm trying to get my feet wet with Spark. I've done some simple stuff in the shell in standalone mode, and now I'm trying to connect to HDFS resources, but I'm running into a problem. I synced to git's master branch (c399baa - SPARK-1456 Remove view bounds on Ordered in favor of a context bound on Ordering. (3 days ago) Michael Armbrust and built like so: SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly This created various jars in various places, including these (I think): ./examples/target/scala-2.10/spark-examples-assembly-1.0.0- SNAPSHOT.jar ./tools/target/scala-2.10/spark-tools-assembly-1.0.0-SNAPSHOT.jar ./assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT- hadoop2.2.0.jar In `conf/spark-env.sh`, I added this (actually before I did the assembly): export HADOOP_CONF_DIR=/etc/hadoop/conf Now I fire up the shell (bin/spark-shell) and try to grab data from HFDS, and get the following exception: scala var hdf = sc.hadoopFile(hdfs:///user/kwilliams/dat/part-m-0) hdf: org.apache.spark.rdd.RDD[(Nothing, Nothing)] = HadoopRDD[0] at hadoopFile at console:12 scala hdf.count() java.lang.RuntimeException: java.lang.InstantiationException at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131 ) at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:155) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:168) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:207) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1064) at org.apache.spark.rdd.RDD.count(RDD.scala:806) at $iwC$$iwC$$iwC$$iwC.init(console:15) at $iwC$$iwC$$iwC.init(console:20) at $iwC$$iwC.init(console:22) at $iwC.init(console:24) at init(console:26) at .init(console:30) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.j ava:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:777) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:10 45) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:84 1) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(Spark ILoop.scala:936) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.sc ala:884) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.sc ala:884) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader. scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) Caused by: java.lang.InstantiationException at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(Ins tantiationExceptionConstructorAccessorImpl.java:48) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at
Re: Problem connecting to HDFS in Spark shell
Hi Ken, On Mon, Apr 21, 2014 at 1:39 PM, Williams, Ken ken.willi...@windlogics.com wrote: I haven't figured out how to let the hostname default to the host mentioned in our /etc/hadoop/conf/hdfs-site.xml like the Hadoop command-line tools do, but that's not so important. Try adding /etc/hadoop/conf to SPARK_CLASSPATH. -- Marcelo
Re: checkpointing without streaming?
Diana, that is a good question. When you persist an RDD, the system still remembers the whole lineage of parent RDDs that created that RDD. If one of the executor fails, and the persist data is lost (both local disk and memory data will get lost), then the lineage is used to recreate the RDD. The longer the lineage, the more recomputation the system has to do in case of failure, and hence higher recovery time. So its not a good idea to have a very long lineage, as it leads to all sorts of problems, like the one Xiangrui pointed to. Checkpointing an RDD actually saves the RDD data to HDFS and removes pointers to the parent RDDs (as the data can be regenerated just by reading from the HDFS file). So that RDDs data does not need to be recomputed when worker fails, just re-read. In fact, the data is also retained across driver restarts as it is in HDFS. RDD.checkpoint was introduced with streaming because streaming is obvious use case where the lineage will grow infinitely long (for stateful computations where each result depends on all the previously received data). However, this checkpointing is useful for any long running RDD computation, and I know that people have used RDD.checkpoint() independent of streaming. TD On Mon, Apr 21, 2014 at 1:10 PM, Xiangrui Meng men...@gmail.com wrote: Persist doesn't cut lineage. You might run into StackOverflow problem with a long lineage. See https://spark-project.atlassian.net/browse/SPARK-1006 for example. On Mon, Apr 21, 2014 at 12:11 PM, Diana Carroll dcarr...@cloudera.com wrote: When might that be necessary or useful? Presumably I can persist and replicate my RDD to avoid re-computation, if that's my goal. What advantage does checkpointing provide over disk persistence with replication? On Mon, Apr 21, 2014 at 2:42 PM, Xiangrui Meng men...@gmail.com wrote: Checkpoint clears dependencies. You might need checkpoint to cut a long lineage in iterative algorithms. -Xiangrui On Mon, Apr 21, 2014 at 11:34 AM, Diana Carroll dcarr...@cloudera.com wrote: I'm trying to understand when I would want to checkpoint an RDD rather than just persist to disk. Every reference I can find to checkpoint related to Spark Streaming. But the method is defined in the core Spark library, not Streaming. Does it exist solely for streaming, or are there circumstances unrelated to streaming in which I might want to checkpoint...and if so, like what? Thanks, Diana
Re: Spark Streaming source from Amazon Kinesis
it is possible Nick. Please take a look here: https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923 the source code is here as a pull request: https://github.com/apache/spark/pull/223 let me know if you have any questions. On Mon, Apr 21, 2014 at 1:00 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I'm looking to start experimenting with Spark Streaming, and I'd like to use Amazon Kinesis https://aws.amazon.com/kinesis/ as my data source. Looking at the list of supported Spark Streaming sourceshttp://spark.apache.org/docs/latest/streaming-programming-guide.html#linking, I don't see any mention of Kinesis. Is it possible to use Spark Streaming with Amazon Kinesis? If not, are there plans to add such support in the future? Nick -- View this message in context: Spark Streaming source from Amazon Kinesishttp://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-source-from-Amazon-Kinesis-tp4550.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: Spark Streaming source from Amazon Kinesis
sorry Matei. Will definitely start working on making the changes soon :) On Mon, Apr 21, 2014 at 1:10 PM, Matei Zaharia matei.zaha...@gmail.comwrote: There was a patch posted a few weeks ago ( https://github.com/apache/spark/pull/223), but it needs a few changes in packaging because it uses a license that isn’t fully compatible with Apache. I’d like to get this merged when the changes are made though — it would be a good input source to support. Matei On Apr 21, 2014, at 1:00 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I'm looking to start experimenting with Spark Streaming, and I'd like to use Amazon Kinesis https://aws.amazon.com/kinesis/ as my data source. Looking at the list of supported Spark Streaming sourceshttp://spark.apache.org/docs/latest/streaming-programming-guide.html#linking, I don't see any mention of Kinesis. Is it possible to use Spark Streaming with Amazon Kinesis? If not, are there plans to add such support in the future? Nick -- View this message in context: Spark Streaming source from Amazon Kinesishttp://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-source-from-Amazon-Kinesis-tp4550.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
RE: Problem connecting to HDFS in Spark shell
-Original Message- From: Marcelo Vanzin [mailto:van...@cloudera.com] Hi Ken, On Mon, Apr 21, 2014 at 1:39 PM, Williams, Ken ken.willi...@windlogics.com wrote: I haven't figured out how to let the hostname default to the host mentioned in our /etc/hadoop/conf/hdfs-site.xml like the Hadoop command-line tools do, but that's not so important. Try adding /etc/hadoop/conf to SPARK_CLASSPATH. It looks like I already had my config set up properly, but I didn't understand the URL syntax - the following works: sc.textFile(hdfs:///user/kwilliams/dat/part-m-0) In other words, just omit the hostname between the 2nd and 3rd slash of the URL. -Ken CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the intended recipient(s) and may contain confidential and privileged information. Any unauthorized review, use, disclosure or distribution of any kind is strictly prohibited. If you are not the intended recipient, please contact the sender via reply e-mail and destroy all copies of the original message. Thank you.
ERROR TaskSchedulerImpl: Lost an executor
Hi, I am trying to set up my own standalone Spark, and I started the master node and worker nodes. Then I ran ./bin/spark-shell, and I get this message: 14/04/21 16:31:51 ERROR TaskSchedulerImpl: Lost an executor 1 (already removed): remote Akka client disassociated 14/04/21 16:31:51 ERROR TaskSchedulerImpl: Lost an executor 2 (already removed): remote Akka client disassociated 14/04/21 16:31:51 ERROR TaskSchedulerImpl: Lost an executor 3 (already removed): remote Akka client disassociated 14/04/21 16:31:51 ERROR TaskSchedulerImpl: Lost an executor 5 (already removed): remote Akka client disassociated 14/04/21 16:31:52 ERROR TaskSchedulerImpl: Lost an executor 6 (already removed): remote Akka client disassociated 14/04/21 16:31:52 ERROR TaskSchedulerImpl: Lost an executor 8 (already removed): remote Akka client disassociated 14/04/21 16:31:52 ERROR TaskSchedulerImpl: Lost an executor 9 (already removed): remote Akka client disassociated 14/04/21 16:31:53 ERROR TaskSchedulerImpl: Lost an executor 7 (already removed): remote Akka client disassociated 14/04/21 16:31:54 ERROR TaskSchedulerImpl: Lost an executor 10 (already removed): remote Akka client disassociated 14/04/21 16:31:54 ERROR TaskSchedulerImpl: Lost an executor 12 (already removed): remote Akka client disassociated 14/04/21 16:31:54 ERROR TaskSchedulerImpl: Lost an executor 11 (already removed): remote Akka client disassociated 14/04/21 16:31:54 ERROR AppClient$ClientActor: Master removed our application: FAILED; stopping client 14/04/21 16:31:54 ERROR TaskSchedulerImpl: Lost an executor 13 (already removed): remote Akka client disassociated 14/04/21 16:31:55 ERROR TaskSchedulerImpl: Lost an executor 4 (already removed): remote Akka client disassociated 14/04/21 16:31:55 ERROR TaskSchedulerImpl: Lost an executor 0 (already removed): remote Akka client disassociated 14/04/21 16:31:56 ERROR TaskSchedulerImpl: Lost an executor 14 (already removed): remote Akka client disassociated I am pretty new to Spark, or even programming.. so I am not sure what is wrong. any idea what could be wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-TaskSchedulerImpl-Lost-an-executor-tp4566.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Hung inserts?
I tried removing the CLUSTERED directive and get the same results :( I also removed SORTED, same deal. I'm going to try removign partitioning all together for now. On Mon, Apr 21, 2014 at 4:58 AM, Mayur Rustagi mayur.rust...@gmail.comwrote: Clustering is not supported. Can you remove that give it a go. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Apr 21, 2014 at 3:20 AM, Brad Heller brad.hel...@gmail.comwrote: Hey list, I've got some CSV data I'm importing from S3. I can create the external table well enough, and I can also do a CREATE TABLE ... AS SELECT ... from it to pull the data internal to Spark. Here's the HQL for my external table: https://gist.github.com/bradhe/11126024 Now I'd like to add partitioning and clustering to my permanent table. So, I create a new table and try to do an INSERT ... SELECT Here's the HQL for my internal, partitioned table and the insert select: https://gist.github.com/bradhe/11126047 Oddly, the query is scheduled...but it never makes any progress! http://i.imgur.com/vXvgpzD.png Is this a bug? Am I doing something dumb? Thanks, Brad Heller
Re: Hung inserts?
So after a little more investigation it turns out this issue happens specifically when I interact with shark server. If I log in to the master and start a shark session (./bin/shark), everything works as expected. i'm starting shark server with the following upstart script, am I doing something wrong?? https://gist.github.com/bradhe/11159123 On Mon, Apr 21, 2014 at 3:31 PM, Brad Heller brad.hel...@gmail.com wrote: I tried removing the CLUSTERED directive and get the same results :( I also removed SORTED, same deal. I'm going to try removign partitioning all together for now. On Mon, Apr 21, 2014 at 4:58 AM, Mayur Rustagi mayur.rust...@gmail.comwrote: Clustering is not supported. Can you remove that give it a go. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Apr 21, 2014 at 3:20 AM, Brad Heller brad.hel...@gmail.comwrote: Hey list, I've got some CSV data I'm importing from S3. I can create the external table well enough, and I can also do a CREATE TABLE ... AS SELECT ... from it to pull the data internal to Spark. Here's the HQL for my external table: https://gist.github.com/bradhe/11126024 Now I'd like to add partitioning and clustering to my permanent table. So, I create a new table and try to do an INSERT ... SELECT Here's the HQL for my internal, partitioned table and the insert select: https://gist.github.com/bradhe/11126047 Oddly, the query is scheduled...but it never makes any progress! http://i.imgur.com/vXvgpzD.png Is this a bug? Am I doing something dumb? Thanks, Brad Heller
Re: [ann] Spark-NYC Meetup
Sounds great François. On 21 Apr 2014 22:31, François Le Lay f...@spotify.com wrote: Hi everyone, This is a quick email to announce the creation of a Spark-NYC Meetup. We have 2 upcoming events, one at PlaceIQ, another at Spotify where Reynold Xin (Databricks) and Christopher Johnson (Spotify) have talks scheduled. More info : http://www.meetup.com/Spark-NYC/ -- François Le Lay Data Infra Chapter Lead Spotify NYC twitter : @lelayf
Re: spark-0.9.1 compiled with Hadoop 2.3.0 doesn't work with S3?
I ran into the same issue. The problem seems to be with the jets3t library that Spark uses in project/SparkBuild.scala. change this: net.java.dev.jets3t % jets3t % 0.7.1 to net.java.dev.jets3t % jets3t % 0.9.0 0.7.1 is not the right version of jets3t for Hadoop 2.3.0 On Mon, Apr 21, 2014 at 11:30 AM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, all I’m writing a Spark application to load S3 data to HDFS, the HDFS version is 2.3.0, so I have to compile Spark with Hadoop 2.3.0 after I execute val allfiles = sc.textFile(s3n://abc/*.txt”) val output = allfiles.saveAsTextFile(hdfs://x.x.x.x:9000/dataset”) Spark throws exception: (actually related to Hadoop?) java.lang.NoClassDefFoundError: org/jets3t/service/ServiceException at org.apache.hadoop.fs.s3.S3FileSystem.createDefaultStore(S3FileSystem.java:100) at org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:90) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2350) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2332) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:369) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.SparkContext.runJob(SparkContext.scala:891) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:741) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:692) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:574) at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:900) at $iwC$$iwC$$iwC$$iwC.init(console:14) at $iwC$$iwC$$iwC.init(console:19) at $iwC$$iwC.init(console:21) at $iwC.init(console:23) at init(console:25) at .init(console:29) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:793) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:838) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:750) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:598) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:605) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:608) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:931) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:881) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:881) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:881) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:973) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) Caused by: java.lang.ClassNotFoundException: org.jets3t.service.ServiceException at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at
Re: spark-0.9.1 compiled with Hadoop 2.3.0 doesn't work with S3?
Yes, I fixed in the same way, but didn’t get a change to get back to here I also made a PR: https://github.com/apache/spark/pull/468 Best, -- Nan Zhu On Monday, April 21, 2014 at 8:19 PM, Parviz Deyhim wrote: I ran into the same issue. The problem seems to be with the jets3t library that Spark uses in project/SparkBuild.scala. change this: net.java.dev.jets3t % jets3t % 0.7.1 to net.java.dev.jets3t % jets3t % 0.9.0 0.7.1 is not the right version of jets3t for Hadoop 2.3.0 On Mon, Apr 21, 2014 at 11:30 AM, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: Hi, all I’m writing a Spark application to load S3 data to HDFS, the HDFS version is 2.3.0, so I have to compile Spark with Hadoop 2.3.0 after I execute val allfiles = sc.textFile(s3n://abc/*.txt”) val output = allfiles.saveAsTextFile(hdfs://x.x.x.x:9000/dataset”) Spark throws exception: (actually related to Hadoop?) java.lang.NoClassDefFoundError: org/jets3t/service/ServiceException at org.apache.hadoop.fs.s3.S3FileSystem.createDefaultStore(S3FileSystem.java:100) at org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:90) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2350) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2332) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:369) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.SparkContext.runJob(SparkContext.scala:891) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:741) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:692) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:574) at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:900) at $iwC$$iwC$$iwC$$iwC.init(console:14) at $iwC$$iwC$$iwC.init(console:19) at $iwC$$iwC.init(console:21) at $iwC.init(console:23) at init(console:25) at .init(console:29) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:793) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:838) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:750) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:598) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:605) at
Adding to an RDD
Feels like a silly questions, But what if I wanted to apply a map to each element in a RDD, but instead of replacing it, I wanted to add new columns of the manipulate value I.e. res0: Array[String] = Array(1 2, 1 3, 1 4, 2 1, 3 1, 4 1) Becomes res0: Array[String] = Array(1 2 2 4, 1 3 1 6, 1 4 1 8, 2 1 4 2, 3 1 6 2, 4 1 8 2) Cheers - Ian
Re: Spark is slow
g1 = pairs1.groupByKey().count() pairs1 = pairs1.groupByKey(g1).cache() g2 = triples.groupByKey().count() pairs2 = pairs2.groupByKey(g2) pairs = pairs2.join(pairs1) Hi, I want to implement hash-partitioned joining as shown above. But somehow, it is taking so long to perform. As I understand, the above joining is only implemented locally right since they are partitioned respectively? After we partition, they will reside in the same node. So, isn't it supposed to be fast when we partition by keys. Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-slow-tp4539p4577.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
two calls of saveAsTextFile() have different results on the same RDD
i just call saveAsTextFile() twice. 'doc_topic_dist' is type of RDD[(Long, Array[Int])], each element is pair of (doc, topic_arr), for the same doc, they have different of topic_arr in two files. ... doc_topic_dist.coalesce(1, true).saveAsTextFile(save_path) doc_topic_dist.coalesce(1, true).saveAsTextFile(save_path + 2) ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/two-calls-of-saveAsTextFile-have-different-results-on-the-same-RDD-tp4578.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
how to solve this problem?
14/04/22 10:43:45 WARN scheduler.TaskSetManager: Loss was due to java.util.NoSuchElementException java.util.NoSuchElementException: End of stream at org.apache.spark.util.NextIterator.next(NextIterator.scala:83) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29) at org.apache.spark.graphx.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:52) at org.apache.spark.graphx.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:51) at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:450) at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:450) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-solve-this-problem-tp4579.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: two calls of saveAsTextFile() have different results on the same RDD
it's ok when i call doc_topic_dist.cache() firstly. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/two-calls-of-saveAsTextFile-have-different-results-on-the-same-RDD-tp4578p4580.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Adding to an RDD
As long as the function that you are mapping over the RDD is pure, preserving referential transparency so that anytime you map the same function over the same initial RDD elements you get the same result elements, then there is no problem in doing what you suggest. In fact, it's common practice. On Mon, Apr 21, 2014 at 6:33 PM, Ian Ferreira ianferre...@hotmail.comwrote: Feels like a silly questions, But what if I wanted to apply a map to each element in a RDD, but instead of replacing it, I wanted to add new columns of the manipulate value I.e. res0: Array[String] = Array(1 2, 1 3, 1 4, 2 1, 3 1, 4 1) Becomes res0: Array[String] = Array(1 2 *2 4*, 1 3 1 6, 1 4 1 8, 2 1 4 2, 3 1 6 2, 4 1 8 2) Cheers - Ian
Re: Strange behaviour of different SSCs with same Kafka topic
Yes. I am running this in a local mode and the SSCs run on the same JVM. So, if I deploy this on a cluster, such behavior would be gone ? Also, is there anyway I can start the SSCs on a local machine but on different JVMs? I couldn't find anything about this in the documentation. The inter-mingling of data seems to be gone after I made some of those external classes as 'scala objects' and keeping static maps and all. Is that a good idea as far as performance is concerned ? Thanks Gagan B Mishra On Tue, Apr 22, 2014 at 1:59 AM, Tathagata Das [via Apache Spark User List] ml-node+s1001560n4556...@n3.nabble.com wrote: Are you by any chance starting two StreamingContexts in the same JVM? That could explain a lot of the weird mixing of data that you are seeing. Its not a supported usage scenario to start multiple streamingContexts simultaneously in the same JVM. TD On Thu, Apr 17, 2014 at 10:58 PM, gaganbm [hidden email]http://user/SendEmail.jtp?type=nodenode=4556i=0 wrote: It happens with normal data rate, i.e., lets say 20 records per second. Apart from that, I am also getting some more strange behavior. Let me explain. I establish two sscs. Start them one after another. In SSCs I get the streams from Kafka sources, and do some manipulations. Like, adding some Record_Name for example, to each of the incoming records. Now this Record_Name is different for both the SSCs, and I get this field from some other class, not relevant to the streams. Now, expected behavior should be, all records in SSC1 gets added with the field RECORD_NAME_1 and all records in SSC2 should get added with the field RECORD_NAME_2. Both the SSCs have nothing to do with each other as I believe. However, strangely enough, I find many records in SSC1 get added with RECORD_NAME_2 and vice versa. Is it some kind of serialization issue ? That, the class which provides this RECORD_NAME gets serialized and is reconstructed and then some weird thing happens inside ? I am unable to figure out. So, apart from skewed frequency and volume of records in both the streams, I am getting this inter-mingling of data among the streams. Can you help me in how to use some external data to manipulate the RDD records ? Thanks and regards Gagan B Mishra *Programmer* *560034, Bangalore* *India* On Tue, Apr 15, 2014 at 4:09 AM, Tathagata Das [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=4434i=0 wrote: Does this happen at low event rate for that topic as well, or only for a high volume rate? TD On Wed, Apr 9, 2014 at 11:24 PM, gaganbm [hidden email]http://user/SendEmail.jtp?type=nodenode=4238i=0 wrote: I am really at my wits' end here. I have different Streaming contexts, lets say 2, and both listening to same Kafka topics. I establish the KafkaStream by setting different consumer groups to each of them. Ideally, I should be seeing the kafka events in both the streams. But what I am getting is really unpredictable. Only one stream gets a lot of events and the other one almost gets nothing or very less compared to the other. Also the frequency is very skewed. I get a lot of events in one stream continuously, and after some duration I get a few events in the other one. I don't know where I am going wrong. I can see consumer fetcher threads for both the streams that listen to the Kafka topics. I can give further details if needed. Any help will be great. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behaviour-of-different-SSCs-with-same-Kafka-topic-tp4050.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behaviour-of-different-SSCs-with-same-Kafka-topic-tp4050p4238.html To start a new topic under Apache Spark User List, email [hidden email]http://user/SendEmail.jtp?type=nodenode=4434i=1 To unsubscribe from Apache Spark User List, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Strange behaviour of different SSCs with same Kafka topichttp://apache-spark-user-list.1001560.n3.nabble.com/Strange-behaviour-of-different-SSCs-with-same-Kafka-topic-tp4050p4434.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com. -- If you reply to this
Need clarification of joining streams
I wanted some clarification on the behavior of join streams. As I believe, the join works per batch. I am reading data from two Kafka streams and then joining them based on some keys. But what happens if one stream hasn't produced any data in that batch duration, and the other has some ? Or lets say, one stream is getting data at a higher rate, and the other one is not so frequent. How does it behave in such a case ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Need-clarification-of-joining-streams-tp4583.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark recovery from bad nodes
Please check my comment on the shark-users threadhttps://groups.google.com/forum/#!searchin/shark-users/Failure$20recovery$20in$20Shark$20when$20cluster$20/shark-users/vUUGLZANxr8/MMCtKhqjhLMJ . On Tue, Apr 22, 2014 at 8:06 AM, rama0120 lakshminaarayana...@gmail.comwrote: Hi, I couldn't find any details regarding this recovery mechanism - could someone please shed some light on this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-recovery-from-bad-nodes-tp4505p4576.html Sent from the Apache Spark User List mailing list archive at Nabble.com.