Re: Spark speed performance
Thank you very much lot of very small json files was exactly the speed performance problem, using coalesce makes my Spark program to run on single node only twice slower (even with starting Spark) than single node Python program, which is acceptable. Jan __ Because the overhead between JVM and Python, single task will be slower than your local Python scripts, but it's very easy to scale to many CPUs. Even one CPUs, it's not common that PySpark was 100 times slower. You have many small files, each file will be processed by a task, which will have about 100ms overhead (scheduled and executed), but the small file can be processed in your single thread Python script in less than 1ms. You could pack your json files into larger ones, or you could try to merge the small tasks into larger one by coalesce(N), such as: distData = sc.textFile(sys.argv[2]).coalesce(10) # which will have 10 partitons (tasks) Davies On Sat, Oct 18, 2014 at 12:07 PM, jan.zi...@centrum.cz wrote: Hi, I have program that I have for single computer (in Python) exection and also implemented the same for Spark. This program basically only reads .json from which it takes one field and saves it back. Using Spark my program runs aproximately 100 times slower on 1 master and 1 slave. So I would like to ask where possibly might be the problem? My Spark program looks like: sc = SparkContext(appName=Json data preprocessor) distData = sc.textFile(sys.argv[2]) json_extractor = JsonExtractor(sys.argv[1]) cleanedData = distData.flatMap(json_extractor.extract_json) cleanedData.saveAsTextFile(sys.argv[3]) JsonExtractor only selects the data from field that is given by sys.argv[1]. My data are basically many small one json files, where is one json per line. I have tried both, reading and writing the data from/to Amazon S3, local disc on all the machines. I would like to ask if there is something that I am missing or if Spark is supposed to be so slow in comparison with the local non parallelized single node program. Thank you in advance for any suggestions or hints. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
why does driver connects to master fail ?
In my programer, the application always connects to master fail for serveral iterations. The driver' log is as follows: WARN AppClient$ClientActor: Connection to akka.tcp://sparkMaster@master1:7077 failed; waiting for master to reconnect... why does this warnning happen and how to avoid it? Best, randylu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-driver-connects-to-master-fail-tp16758.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why does driver connects to master fail ?
In additional, driver receives serveral DisassociatedEvent messages. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-driver-connects-to-master-fail-tp16758p16759.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What's wrong with my spark filter? I get org.apache.spark.SparkException: Task not serializable
Check for any variables you've declared in your class. Even if you're not calling them from the function they are passed to the worker nodes as part of the context. Consequently, if you have something without a default serializer (like an imported class) it will also get passed. To fix this you can either move that variable out of the class (make it global) or you can implement kryo serialization (see the Spark tuning guide for this). On Oct 17, 2014 6:37 AM, shahab shahab.mok...@gmail.com wrote: Hi, Probably I am missing very simple principle , but something is wrong with my filter, i get org.apache.spark.SparkException: Task not serializable expetion. here is my filter function: object OBJ { def f1(): Boolean = { var i = 1; for (j-1 to 10) i = i +1; true; } } rdd.filter(row = OBJ.f1()) And when I run, I get the following exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.filter(RDD.scala:282) ... Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ... best, /Shahab
scala.MatchError: class java.sql.Timestamp
I am working with Spark 1.1.0 and I believe Timestamp is a supported data type for Spark SQL. However I keep getting this MatchError for java.sql.Timestamp when I try to use reflection to register a Java Bean with Timestamp field. Anything wrong with my code below? public static class Event implements Serializable { private String name; private Timestamp time; public String getName() { return name; } public void setName(String name) { this.name = name; } public Timestamp getTime() { return time; } public void setTime(Timestamp time) { this.time = time; } } @Test public void testTimeStamp() { JavaSparkContext sc = new JavaSparkContext(local, timestamp); String[] data = {1,2014-01-01, 2,2014-02-01}; JavaRDDString input = sc.parallelize(Arrays.asList(data)); JavaRDDEvent events = input.map(new FunctionString,Event() { public Event call(String arg0) throws Exception { String[] c = arg0.split(,); Event e = new Event(); e.setName(c[0]); DateFormat fmt = new SimpleDateFormat(-MM-dd); e.setTime(new Timestamp(fmt.parse(c[1]).getTime())); return e; } }); JavaSQLContext sqlCtx = new JavaSQLContext(sc); JavaSchemaRDD schemaEvent = sqlCtx.applySchema(events, Event.class); schemaEvent.registerTempTable(event); sc.stop(); }
RE: scala.MatchError: class java.sql.Timestamp
Can you provide the exception stack? Thanks, Daoyuan From: Ge, Yao (Y.) [mailto:y...@ford.com] Sent: Sunday, October 19, 2014 10:17 PM To: user@spark.apache.org Subject: scala.MatchError: class java.sql.Timestamp I am working with Spark 1.1.0 and I believe Timestamp is a supported data type for Spark SQL. However I keep getting this MatchError for java.sql.Timestamp when I try to use reflection to register a Java Bean with Timestamp field. Anything wrong with my code below? public static class Event implements Serializable { private String name; private Timestamp time; public String getName() { return name; } public void setName(String name) { this.name = name; } public Timestamp getTime() { return time; } public void setTime(Timestamp time) { this.time = time; } } @Test public void testTimeStamp() { JavaSparkContext sc = new JavaSparkContext(local, timestamp); String[] data = {1,2014-01-01, 2,2014-02-01}; JavaRDDString input = sc.parallelize(Arrays.asList(data)); JavaRDDEvent events = input.map(new FunctionString,Event() { public Event call(String arg0) throws Exception { String[] c = arg0.split(,); Event e = new Event(); e.setName(c[0]); DateFormat fmt = new SimpleDateFormat(-MM-dd); e.setTime(new Timestamp(fmt.parse(c[1]).getTime())); return e; } }); JavaSQLContext sqlCtx = new JavaSQLContext(sc); JavaSchemaRDD schemaEvent = sqlCtx.applySchema(events, Event.class); schemaEvent.registerTempTable(event); sc.stop(); }
RE: scala.MatchError: class java.sql.Timestamp
scala.MatchError: class java.sql.Timestamp (of class java.lang.Class) at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$getSchema$1.apply(JavaSQLContext.scala:189) at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$getSchema$1.apply(JavaSQLContext.scala:188) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.api.java.JavaSQLContext.getSchema(JavaSQLContext.scala:188) at org.apache.spark.sql.api.java.JavaSQLContext.applySchema(JavaSQLContext.scala:90) at com.ford.dtc.ff.SessionStats.testTimeStamp(SessionStats.java:111) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222) at org.junit.runners.ParentRunner.run(ParentRunner.java:300) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197) From: Wang, Daoyuan [mailto:daoyuan.w...@intel.com] Sent: Sunday, October 19, 2014 10:31 AM To: Ge, Yao (Y.); user@spark.apache.org Subject: RE: scala.MatchError: class java.sql.Timestamp Can you provide the exception stack? Thanks, Daoyuan From: Ge, Yao (Y.) [mailto:y...@ford.com] Sent: Sunday, October 19, 2014 10:17 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: scala.MatchError: class java.sql.Timestamp I am working with Spark 1.1.0 and I believe Timestamp is a supported data type for Spark SQL. However I keep getting this MatchError for java.sql.Timestamp when I try to use reflection to register a Java Bean with Timestamp field. Anything wrong with my code below? public static class Event implements Serializable { private String name; private Timestamp time; public String getName() { return name; } public void setName(String name) { this.name = name; } public Timestamp getTime() { return time; } public void setTime(Timestamp time) { this.time = time; } } @Test public void testTimeStamp() { JavaSparkContext sc = new
Error while running Streaming examples - no snappyjava in java.library.path
I built the latest Spark project and I'm running into these errors when attempting to run the streaming examples locally on the Mac, how do I fix these errors? java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1886) at java.lang.Runtime.loadLibrary0(Runtime.java:849) at java.lang.System.loadLibrary(System.java:1088) at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:170) at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:145) at org.xerial.snappy.Snappy.clinit(Snappy.java:47) at org.xerial.snappy.SnappyOutputStream.init(SnappyOutputStream.java:81) at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125) at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1083) at org.apache.spark.storage.BlockManager$$anonfun$7.apply(BlockManager.scala:579) at org.apache.spark.storage.BlockManager$$anonfun$7.apply(BlockManager.scala:579) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:126) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:732) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:731) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:789) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:731) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:727) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:727) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-Streaming-examples-no-snappyjava-in-java-library-path-tp16765.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Using SVMWithSGD model to predict
Hi, I'm new to spark and just trying to make sense of the SVMWithSGD example. I ran my dataset through it and build a model. When I call predict() on the testing data (after clearThreshold()) I was expecting to get answers in the range of 0 to 1. But they aren't, all predictions seem to be negative numbers between -0 and -2. I guess my question is what do these predictions mean? How are they of use? The outcome I need is a probability rather than a binary.Here's my java code:SparkConf conf = new SparkConf() .setAppName(name).set(spark.cores.max, 1); JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD points = sc.textFile(path).map(new ParsePoint()).cache();JavaRDD training = points.sample(false, 0.8, 0L).cache();JavaRDD testing = points.subtract(training);SVMModel model = SVMWithSGD.train(training.rdd(), 100);model.clearThreshold(); for (LabeledPoint point : testing.toArray()) {Double score = model.predict(point.features());System.out.println(score = + score);//- all these are negative numbers, seemingly between 0 and -2 } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-SVMWithSGD-model-to-predict-tp16767.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Using SVMWithSGD model to predict
Hi, I'm new to spark and just trying to make sense of the SVMWithSGD example. I ran my dataset through it and build a model. When I call predict() on the testing data (after clearThreshold()) I was expecting to get answers in the range of 0 to 1. But they aren't, all predictions seem to be negative numbers between -0 and -2. I guess my question is what do these predictions mean? How are they of use? The outcome I need is a probability rather than a binary. Here's my java code: SparkConf conf = new SparkConf() .setAppName(name) .set(spark.cores.max, 1); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDLabeledPoint points = sc.textFile(path).map(new ParsePoint()).cache(); JavaRDDLabeledPoint training = points.sample(false, 0.8, 0L).cache(); JavaRDDLabeledPoint testing = points.subtract(training); SVMModel model = SVMWithSGD.train(training.rdd(), 100); model.clearThreshold(); for (LabeledPoint point : testing.toArray()) { Double score = model.predict(point.features()); System.out.println(score = + score);//- all these are negative numbers, seemingly between 0 and -2 }
Re: Using SVMWithSGD model to predict
The problem is that you called clearThreshold(). The result becomes the SVM margin not a 0/1 class prediction. There is no probability output. There was a very similar question last week. Is there an example out there suggesting clearThreshold()? I also wonder if it is good to overload the meaning of the output indirectly this way. On Oct 19, 2014 6:53 PM, npomfret nick-nab...@snowmonkey.co.uk wrote: Hi, I'm new to spark and just trying to make sense of the SVMWithSGD example. I ran my dataset through it and build a model. When I call predict() on the testing data (after clearThreshold()) I was expecting to get answers in the range of 0 to 1. But they aren't, all predictions seem to be negative numbers between -0 and -2. I guess my question is what do these predictions mean? How are they of use? The outcome I need is a probability rather than a binary. Here's my java code: SparkConf conf = new SparkConf() .setAppName(name) .set(spark.cores.max, 1); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD points = sc.textFile(path).map(new ParsePoint()).cache(); JavaRDD training = points.sample(false, 0.8, 0L).cache(); JavaRDD testing = points.subtract(training); SVMModel model = SVMWithSGD.train(training.rdd(), 100); model.clearThreshold(); for (LabeledPoint point : testing.toArray()) { Double score = model.predict(point.features()); System.out.println(score = + score);//- all these are negative numbers, seemingly between 0 and -2 } -- View this message in context: Using SVMWithSGD model to predict http://apache-spark-user-list.1001560.n3.nabble.com/Using-SVMWithSGD-model-to-predict-tp16767.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Using SVMWithSGD model to predict
Thanks. The example I used is here https://spark.apache.org/docs/latest/mllib-linear-methods.html see SVMClassifier So there's no way to get a probability based output? What about from linear regression, or logistic regression? On 19 October 2014 19:52, Sean Owen so...@cloudera.com wrote: The problem is that you called clearThreshold(). The result becomes the SVM margin not a 0/1 class prediction. There is no probability output. There was a very similar question last week. Is there an example out there suggesting clearThreshold()? I also wonder if it is good to overload the meaning of the output indirectly this way. On Oct 19, 2014 6:53 PM, npomfret nick-nab...@snowmonkey.co.uk wrote: Hi, I'm new to spark and just trying to make sense of the SVMWithSGD example. I ran my dataset through it and build a model. When I call predict() on the testing data (after clearThreshold()) I was expecting to get answers in the range of 0 to 1. But they aren't, all predictions seem to be negative numbers between -0 and -2. I guess my question is what do these predictions mean? How are they of use? The outcome I need is a probability rather than a binary. Here's my java code: SparkConf conf = new SparkConf() .setAppName(name) .set(spark.cores.max, 1); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD points = sc.textFile(path).map(new ParsePoint()).cache(); JavaRDD training = points.sample(false, 0.8, 0L).cache(); JavaRDD testing = points.subtract(training); SVMModel model = SVMWithSGD.train(training.rdd(), 100); model.clearThreshold(); for (LabeledPoint point : testing.toArray()) { Double score = model.predict(point.features()); System.out.println(score = + score);//- all these are negative numbers, seemingly between 0 and -2 } -- View this message in context: Using SVMWithSGD model to predict http://apache-spark-user-list.1001560.n3.nabble.com/Using-SVMWithSGD-model-to-predict-tp16767.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Using SVMWithSGD model to predict
Ah right. It is important to use clearThreshold() in that example in order to generate margins, because the AUC metric needs the classifications to be ranked by some relative strength, rather than just 0/1. These outputs are not probabilities, and that is not what SVMs give you in general. There are techniques for estimating probabilities from SVM output but these aren't present here. If you just want 0/1, you do not want to call clearThreshold(). Linear regression is not a classifier so probabilities don't enter into it. Logistic regression however does give you a probability if you compute the logistic function of the input directly. On Sun, Oct 19, 2014 at 3:00 PM, Nick Pomfret nick-nab...@snowmonkey.co.uk wrote: Thanks. The example I used is here https://spark.apache.org/docs/latest/mllib-linear-methods.html see SVMClassifier So there's no way to get a probability based output? What about from linear regression, or logistic regression? On 19 October 2014 19:52, Sean Owen so...@cloudera.com wrote: The problem is that you called clearThreshold(). The result becomes the SVM margin not a 0/1 class prediction. There is no probability output. There was a very similar question last week. Is there an example out there suggesting clearThreshold()? I also wonder if it is good to overload the meaning of the output indirectly this way. On Oct 19, 2014 6:53 PM, npomfret nick-nab...@snowmonkey.co.uk wrote: Hi, I'm new to spark and just trying to make sense of the SVMWithSGD example. I ran my dataset through it and build a model. When I call predict() on the testing data (after clearThreshold()) I was expecting to get answers in the range of 0 to 1. But they aren't, all predictions seem to be negative numbers between -0 and -2. I guess my question is what do these predictions mean? How are they of use? The outcome I need is a probability rather than a binary. Here's my java code: SparkConf conf = new SparkConf() .setAppName(name) .set(spark.cores.max, 1); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD points = sc.textFile(path).map(new ParsePoint()).cache(); JavaRDD training = points.sample(false, 0.8, 0L).cache(); JavaRDD testing = points.subtract(training); SVMModel model = SVMWithSGD.train(training.rdd(), 100); model.clearThreshold(); for (LabeledPoint point : testing.toArray()) { Double score = model.predict(point.features()); System.out.println(score = + score);//- all these are negative numbers, seemingly between 0 and -2 } View this message in context: Using SVMWithSGD model to predict Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What executes on worker and what executes on driver side
Any response for this? 1. How do I know what statements will be executed on worker side out of the spark script in a stage. e.g. if I have val x = 1 (or any other code) in my driver code, will the same statements be executed on the worker side in a stage? 2. How can I do a map side join in spark : a. without broadcast(i.e. by reading a file once in each executor) b. with broadcast but by broadcasting complete RDD to each executor Regards - Saurabh Wadhawan On 19-Oct-2014, at 1:54 am, Saurabh Wadhawan saurabh.wadha...@guavus.commailto:saurabh.wadha...@guavus.com wrote: Hi, I have following questions: 1. When I write a spark script, how do I know what part runs on the driver side and what runs on the worker side. So lets say, I write code to to read a plain text file. Will it run on driver side only or will it run on server side only or on both sides 2. If I want each worker to load a file for lets say join and the file is pretty huge lets say in GBs, so that I don't want to broadcast it, then what's the best way to do it. Another way to say the same thing would be how do I load a data structure for fast lookup(and not an RDD) on each worker node in the executor Regards - Saurabh
Re: Submissions open for Spark Summit East 2015
BTW several people asked about registration and student passes. Registration will open in a few weeks, and like in previous Spark Summits, I expect there to be a special pass for students. Matei On Oct 18, 2014, at 9:52 PM, Matei Zaharia matei.zaha...@gmail.com wrote: After successful events in the past two years, the Spark Summit conference has expanded for 2015, offering both an event in New York on March 18-19 and one in San Francisco on June 15-17. The conference is a great chance to meet people from throughout the Spark community and see the latest news, tips and use cases. Submissions are now open for Spark Summit East 2015, to be held in New York on March 18-19. If you’d like to give a talk on use cases, neat applications, or ongoing Spark development, submit your talk online today at http://prevalentdesignevents.com/sparksummit2015/east/speaker/. Submissions will be open until December 6th, 2014. If you missed this year’s Spark Summit, you can still find videos from all talks online at http://spark-summit.org/2014. Hope to see you there, Matei - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Oryx + Spark mllib
Hi Deb, Do check out https://github.com/OryxProject/oryx. It does integrate with Spark. Sean has put in quite a bit of neat details on the page about the architecture. It has all the things you are thinking about:) Thanks, Jayant On Sat, Oct 18, 2014 at 8:49 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, Is someone working on a project on integrating Oryx model serving layer with Spark ? Models will be built using either Streaming data / Batch data in HDFS and cross validated with mllib APIs but the model serving layer will give API endpoints like Oryx and read the models may be from hdfs/impala/SparkSQL One of the requirement is that the API layer should be scalable and elastic...as requests grow we should be able to add more nodes...using play and akka clustering module... If there is a ongoing project on github please point to it... Is there a plan of adding model serving and experimentation layer to mllib ? Thanks. Deb
Is Spark the right tool?
I am very new to Spark. I am work on a project that involves reading stock transactions off a number of TCP connections and 1. periodically (once every few hours) uploads the transaction records to HBase 2. maintains the records that are not yet written into HBase and acts as a HTTP query server for these records. An example for a query would be to return all transactions between 1-2pm for Google stocks for the current trading day. I am thinking of using Kafka to receive all the transaction records. Spark will be the consumers of Kafka output. In particular, I need to create a RDD hashmap with string (stock ticker symbol) as key and list (or vector) of transaction records as data. This RDD need to be thread (or process) safe since different threads and processes will be reading and modifying it. I need insertion, deletion, and lookup to be fast. Is this something that can be done with Spark and is Spark the right tool to use in terms of latency and throughput? Pardon me if I don't know what I am talking about. All these are very new to me. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-the-right-tool-tp16775.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using SVMWithSGD model to predict
Thanks for the info. On 19 October 2014 20:46, Sean Owen so...@cloudera.com wrote: Ah right. It is important to use clearThreshold() in that example in order to generate margins, because the AUC metric needs the classifications to be ranked by some relative strength, rather than just 0/1. These outputs are not probabilities, and that is not what SVMs give you in general. There are techniques for estimating probabilities from SVM output but these aren't present here. If you just want 0/1, you do not want to call clearThreshold(). Linear regression is not a classifier so probabilities don't enter into it. Logistic regression however does give you a probability if you compute the logistic function of the input directly. On Sun, Oct 19, 2014 at 3:00 PM, Nick Pomfret nick-nab...@snowmonkey.co.uk wrote: Thanks. The example I used is here https://spark.apache.org/docs/latest/mllib-linear-methods.html see SVMClassifier So there's no way to get a probability based output? What about from linear regression, or logistic regression? On 19 October 2014 19:52, Sean Owen so...@cloudera.com wrote: The problem is that you called clearThreshold(). The result becomes the SVM margin not a 0/1 class prediction. There is no probability output. There was a very similar question last week. Is there an example out there suggesting clearThreshold()? I also wonder if it is good to overload the meaning of the output indirectly this way. On Oct 19, 2014 6:53 PM, npomfret nick-nab...@snowmonkey.co.uk wrote: Hi, I'm new to spark and just trying to make sense of the SVMWithSGD example. I ran my dataset through it and build a model. When I call predict() on the testing data (after clearThreshold()) I was expecting to get answers in the range of 0 to 1. But they aren't, all predictions seem to be negative numbers between -0 and -2. I guess my question is what do these predictions mean? How are they of use? The outcome I need is a probability rather than a binary. Here's my java code: SparkConf conf = new SparkConf() .setAppName(name) .set(spark.cores.max, 1); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD points = sc.textFile(path).map(new ParsePoint()).cache(); JavaRDD training = points.sample(false, 0.8, 0L).cache(); JavaRDD testing = points.subtract(training); SVMModel model = SVMWithSGD.train(training.rdd(), 100); model.clearThreshold(); for (LabeledPoint point : testing.toArray()) { Double score = model.predict(point.features()); System.out.println(score = + score);//- all these are negative numbers, seemingly between 0 and -2 } View this message in context: Using SVMWithSGD model to predict Sent from the Apache Spark User List mailing list archive at Nabble.com.
mlib model build and low CPU usage
I'm building a model in a stand alone cluster with just a single worker limited to use 3 cores and 4GB ram. The node starts up and spits out the message: Starting Spark worker 192.168.1.185:60203 with 3 cores, 4.0 GB RAM During the model train (SVMWithSGD) the CPU on the worker is very low. It barley gets above 25% of a single core. I've tried adjusting the number of cores but no matter what it the CPU usage seems to average around 25% (of 1 just core). The machine is running OS x with a quad core i7. Is this expected behaviour? It feels like the cpu is massively under utilised.
Spark Streaming scheduling control
Hello, I have a cluster 1 master and 2 slaves running on 1.1.0. I am having problems to get both slaves working at the same time. When I launch the driver on the master, one of the slaves is assigned the receiver task, and initially both slaves start processing tasks. After a few tens of batches, the slave running the receiver starts processing all tasks, and the other won't execute any task more. If I cancel the execution and start over, the roles may switch if the other slave gets to be assigned the receiver, but the behaviour is the same, and the other slave will stop processing tasks after a short while. So both slaves are working, essentially, but never at the same time in a consistent way. No errors on logs, etc. I have tried increasing partitions (up to 100, while slaves have 4 cores each) but no success :-/ I understand that Spark may decide not to distribute tasks to all workers due to data locality, etc. but in this case I think there is something else, since one slave cannot keep up with the processing rate and the total delay keeps growing: I have set up the batch interval to 1s, but each batch is processed in 1.6s so after some time the delay (and the enqueued data) is just too much. Does Spark take into consideration this time restriction on the scheduling? I mean total processing time = batch duration. Any configuration affecting that? Am I missing something important? Any hints or things to tests? Thanks in advance! ;-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-scheduling-control-tp16778.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Upgrade to Spark 1.1.0?
Trying to upgrade from Spark 1.0.1 to 1.1.0. Can’t imagine the upgrade is the problem but anyway... I get a NoClassDefFoundError for RandomGenerator when running a driver from the CLI. But only when using a named master, even a standalone master. If I run using master = local[4] the job executes correctly but if I set the master to spark://Maclaurin.local:7077 though they are the same machine I get the NoClassDefFoundError. The classpath seems correct on the CLI and the jars do indeed contain the offending class (see below). There must be some difference in how classes are loaded between local[4] and spark://Maclaurin.local:7077? Any ideas? === The driver is in mahout-spark_2.10-1.0-SNAPSHOT-job.jar so it’s execution means it must be in the classpath. When I look at what’s in the jar I see RandomGenerator. Maclaurin:target pat$ jar tf mahout-spark_2.10-1.0-SNAPSHOT-job.jar | grep RandomGenerator cern/jet/random/engine/RandomGenerator.class org/apache/commons/math3/random/GaussianRandomGenerator.class org/apache/commons/math3/random/JDKRandomGenerator.class org/apache/commons/math3/random/UniformRandomGenerator.class org/apache/commons/math3/random/RandomGenerator.class ==! org/apache/commons/math3/random/NormalizedRandomGenerator.class org/apache/commons/math3/random/AbstractRandomGenerator.class org/apache/commons/math3/random/StableRandomGenerator.class But get the following error executing the job: 14/10/19 15:39:00 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 6.9 (TID 84, 192.168.0.2): java.lang.NoClassDefFoundError: org/apache/commons/math3/random/RandomGenerator org.apache.mahout.common.RandomUtils.getRandom(RandomUtils.java:65) org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$5.apply(SimilarityAnalysis.scala:272) org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$5.apply(SimilarityAnalysis.scala:267) org.apache.mahout.sparkbindings.blas.MapBlock$$anonfun$1.apply(MapBlock.scala:33) org.apache.mahout.sparkbindings.blas.MapBlock$$anonfun$1.apply(MapBlock.scala:32) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235) org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) java.lang.Thread.run(Thread.java:695) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: how to build spark 1.1.0 to include org.apache.commons.math3 ?
@Sean Owen, Thank you for the information. I change the pom file to include math3, because I needed the math3 library from my previous use with 1.0.2. Best regards, Henry -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Saturday, October 18, 2014 2:19 AM To: MA33 YTHung1 Cc: user@spark.apache.org Subject: Re: how to build spark 1.1.0 to include org.apache.commons.math3 ? It doesn't contain commons math3 since Spark does not depend on it. Its tests do, but tests are not built into the Spark assembly. On Thu, Oct 16, 2014 at 9:57 PM, Henry Hung ythu...@winbond.com wrote: HI All, I try to build spark 1.1.0 using sbt with command: sbt/sbt -Dhadoop.version=2.2.0 -Pyarn assembly but the resulting spark-assembly-1.1.0-hadoop2.2.0.jar still missing the apache commons math3 classes. How to add the math3 into package? Best regards, Henry The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.
RE: scala.MatchError: class java.sql.Timestamp
Seems bugs in the JavaSQLContext.getSchema(), which doesn't enumerate all of the data types supported by Catalyst. From: Ge, Yao (Y.) [mailto:y...@ford.com] Sent: Sunday, October 19, 2014 11:44 PM To: Wang, Daoyuan; user@spark.apache.org Subject: RE: scala.MatchError: class java.sql.Timestamp scala.MatchError: class java.sql.Timestamp (of class java.lang.Class) at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$getSchema$1.apply(JavaSQLContext.scala:189) at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$getSchema$1.apply(JavaSQLContext.scala:188) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.api.java.JavaSQLContext.getSchema(JavaSQLContext.scala:188) at org.apache.spark.sql.api.java.JavaSQLContext.applySchema(JavaSQLContext.scala:90) at com.ford.dtc.ff.SessionStats.testTimeStamp(SessionStats.java:111) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222) at org.junit.runners.ParentRunner.run(ParentRunner.java:300) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197) From: Wang, Daoyuan [mailto:daoyuan.w...@intel.com] Sent: Sunday, October 19, 2014 10:31 AM To: Ge, Yao (Y.); user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: scala.MatchError: class java.sql.Timestamp Can you provide the exception stack? Thanks, Daoyuan From: Ge, Yao (Y.) [mailto:y...@ford.com] Sent: Sunday, October 19, 2014 10:17 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: scala.MatchError: class java.sql.Timestamp I am working with Spark 1.1.0 and I believe Timestamp is a supported data type for Spark SQL. However I keep getting this MatchError for java.sql.Timestamp when I try to use reflection to register a Java Bean with Timestamp field. Anything wrong with my code below? public static class Event implements Serializable { private String name; private Timestamp time; public String getName() { return name; } public void setName(String name) { this.name = name; } public Timestamp getTime() { return time; }
All executors run on just a few nodes
Hi all, I have a Spark-0.9 cluster, which has 16 nodes. I wrote a Spark application to read data from an HBase table, which has 86 regions spreading over 20 RegionServers. I submitted the Spark app in Spark standalone mode and found that there were 86 executors running on just 3 nodes and it took about 30 minutes to read data from the table. In this case, I noticed from Spark master UI that Locality Level of all executors are PROCESS_LOCAL. Later I ran the same app again (without any code changed) and found that those 86 executors were running on 16 nodes, and this time it took just 4 minutes to read date from the same HBase table. In this case, I noticed that Locality Level of most executors are NODE_LOCAL. After testing multiple times, I found the two cases above occur randomly. So I have 2 questions: 1) Why would the two cases above occur randomly when I submitted the same application multiple times ? 2) Would the spread of executors influence locality level ? Thank you.
default parallelism bug?
Hi, I usually use file on hdfs to make PairRDD and analyze it by using combineByKey,reduceByKey, etc. But sometimes it hangs when I set spark.default.parallelism configuration, though the size of file is small. If I remove this configuration, all works fine. Does anyone tell me why this occur? Regards, Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/default-parallelism-bug-tp16787.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: scala.MatchError: class java.sql.Timestamp
I have created an issue for this https://issues.apache.org/jira/browse/SPARK-4003 From: Cheng, Hao Sent: Monday, October 20, 2014 9:20 AM To: Ge, Yao (Y.); Wang, Daoyuan; user@spark.apache.org Subject: RE: scala.MatchError: class java.sql.Timestamp Seems bugs in the JavaSQLContext.getSchema(), which doesn't enumerate all of the data types supported by Catalyst. From: Ge, Yao (Y.) [mailto:y...@ford.com] Sent: Sunday, October 19, 2014 11:44 PM To: Wang, Daoyuan; user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: scala.MatchError: class java.sql.Timestamp scala.MatchError: class java.sql.Timestamp (of class java.lang.Class) at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$getSchema$1.apply(JavaSQLContext.scala:189) at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$getSchema$1.apply(JavaSQLContext.scala:188) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.api.java.JavaSQLContext.getSchema(JavaSQLContext.scala:188) at org.apache.spark.sql.api.java.JavaSQLContext.applySchema(JavaSQLContext.scala:90) at com.ford.dtc.ff.SessionStats.testTimeStamp(SessionStats.java:111) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222) at org.junit.runners.ParentRunner.run(ParentRunner.java:300) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197) From: Wang, Daoyuan [mailto:daoyuan.w...@intel.com] Sent: Sunday, October 19, 2014 10:31 AM To: Ge, Yao (Y.); user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: scala.MatchError: class java.sql.Timestamp Can you provide the exception stack? Thanks, Daoyuan From: Ge, Yao (Y.) [mailto:y...@ford.com] Sent: Sunday, October 19, 2014 10:17 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: scala.MatchError: class java.sql.Timestamp I am working with Spark 1.1.0 and I believe Timestamp is a supported data type for Spark SQL. However I keep getting this MatchError for java.sql.Timestamp when I try to use reflection to register a Java Bean with Timestamp field. Anything wrong with my code below? public static class Event implements Serializable { private String name; private Timestamp time; public String getName() { return name; } public void
Re: How to write a RDD into One Local Existing File?
Write to hdfs and then get one file locally bu using hdfs dfs -getmerge... On Friday, October 17, 2014, Sean Owen so...@cloudera.com wrote: You can save to a local file. What are you trying and what doesn't work? You can output one file by repartitioning to 1 partition but this is probably not a good idea as you are bottlenecking the output and some upstream computation by disabling parallelism. How about just combining the files on HDFS afterwards? or just reading all the files instead of 1? You can hdfs dfs -cat a bunch of files at once. On Fri, Oct 17, 2014 at 6:46 PM, Parthus peng.wei@gmail.com javascript:; wrote: Hi, I have a spark mapreduce task which requires me to write the final rdd to an existing local file (appending to this file). I tried two ways but neither works well: 1. use saveAsTextFile() api. Spark 1.1.0 claims that this API can write to local, but I never make it work. Moreover, the result is not one file but a series of part-x files which is not what I hope to get. 2. collect the rdd to an array and write it to the driver node using Java's File IO. There are also two problems: 1) my RDD is huge(1TB), which cannot fit into the memory of one driver node. I have to split the task into small pieces and collect them part by part and write; 2) During the writing by Java IO, the Spark Mapreduce task has to wait, which is not efficient. Could anybody provide me an efficient way to solve this problem? I wish that the solution could be like: appending a huge rdd to a local file without pausing the MapReduce during writing? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-write-a-RDD-into-One-Local-Existing-File-tp16720.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:; - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:; -- - Rishi