Re: diamond dependency tree
Yes, sorry I meant DAG. I fixed it in my message but not the subject. The terminology of leaf wasn't helpful I know so hopefully my visual example was enough. Anyway, I noticed what you said in a local-mode test. I can try that in a cluster, too. Thank you! On Thu, Sep 18, 2014 at 10:28 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Thu, Sep 18, 2014 at 8:55 PM, Victor Tso-Guillen v...@paxata.com wrote: Is it possible to express a diamond DAG and have the leaf dependency evaluate only once? Well, strictly speaking your graph is not a tree, and also the meaning of leaf is not totally clear, I'd say. So say data flows left to right (and the dependencies are oriented right to left): [image: Inline image 1] Is it possible to run d.collect() and have a evaluate its iterator only once? If you say a.cache() (or a.persist()) then it will be evaluated only once and then the cached data will be used for later accesses. Tobias
Re: Use Case of mutable RDD - any ideas around will help.
Sweet, that's probably it. Too bad it didn't seem to make 1.1? On Wed, Sep 17, 2014 at 5:32 PM, Michael Armbrust mich...@databricks.com wrote: The unknown slowdown might be addressed by https://github.com/apache/spark/commit/f858f466862541c3faad76a1fa2391f1c17ec9dd On Sun, Sep 14, 2014 at 10:40 PM, Evan Chan velvia.git...@gmail.com wrote: SPARK-1671 looks really promising. Note that even right now, you don't need to un-cache the existing table. You can do something like this: newAdditionRdd.registerTempTable(table2) sqlContext.cacheTable(table2) val unionedRdd = sqlContext.table(table1).unionAll(sqlContext.table(table2)) When you use table, it will return you the cached representation, so that the union executes much faster. However, there is some unknown slowdown, it's not quite as fast as what you would expect. On Fri, Sep 12, 2014 at 2:09 PM, Cheng Lian lian.cs@gmail.com wrote: Ah, I see. So basically what you need is something like cache write through support which exists in Shark but not implemented in Spark SQL yet. In Shark, when inserting data into a table that has already been cached, the newly inserted data will be automatically cached and “union”-ed with the existing table content. SPARK-1671 was created to track this feature. We’ll work on that. Currently, as a workaround, instead of doing union at the RDD level, you may try cache the new table, union it with the old table and then query the union-ed table. The drawbacks is higher code complexity and you end up with lots of temporary tables. But the performance should be reasonable. On Fri, Sep 12, 2014 at 1:19 PM, Archit Thakur archit279tha...@gmail.com wrote: LittleCode snippet: line1: cacheTable(existingRDDTableName) line2: //some operations which will materialize existingRDD dataset. line3: existingRDD.union(newRDD).registerAsTable(new_existingRDDTableName) line4: cacheTable(new_existingRDDTableName) line5: //some operation that will materialize new _existingRDD. now, what we expect is in line4 rather than caching both existingRDDTableName and new_existingRDDTableName, it should cache only new_existingRDDTableName. but we cannot explicitly uncache existingRDDTableName because we want the union to use the cached existingRDDTableName. since being lazy new_existingRDDTableName could be materialized later and by then we cant lose existingRDDTableName from cache. What if keep the same name of the new table so, cacheTable(existingRDDTableName) existingRDD.union(newRDD).registerAsTable(existingRDDTableName) cacheTable(existingRDDTableName) //might not be needed again. Will our both cases be satisfied, that it uses existingRDDTableName from cache for union and dont duplicate the data in the cache but somehow, append to the older cacheTable. Thanks and Regards, Archit Thakur. Sr Software Developer, Guavus, Inc. On Sat, Sep 13, 2014 at 12:01 AM, pankaj arora pankajarora.n...@gmail.com wrote: I think i should elaborate usecase little more. So we have UI dashboard whose response time is quite fast as all the data is cached. Users query data based on time range and also there is always new data coming into the system at predefined frequency lets say 1 hour. As you said i can uncache tables it will basically drop all data from memory. I cannot afford losing my cache even for short interval. As all queries from UI will get slow till the time cache loads again. UI response time needs to be predictable and shoudl be fast enough so that user does not get irritated. Also i cannot keep two copies of data(till newrdd materialize) into memory as it will surpass total available memory in system. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Use-Case-of-mutable-RDD-any-ideas-around-will-help-tp14095p14112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Example of Geoprocessing with Spark
Hi Abel, Pretty interesting. May I ask how big is your point CSV dataset? It seems you are relying on searching through the FeatureCollection of polygons for which one intersects your point. This is going to be extremely slow. I highly recommend using a SpatialIndex, such as the many that exist in the JTS library itself, to speed things up. Also, note that the geoscript library is not really maintained anymore. I forked it with the intention of maintaining it some more, but I've decided this is not really a good direction. On Thu, Sep 18, 2014 at 7:02 PM, Abel Coronado Iruegas acoronadoirue...@gmail.com wrote: Now i have a better version, but now the problem is that the saveAsTextFile do not finish the Job, in the hdfs repository only exist a partial temporary file, someone can tell me what is wrong: Thanks !! object SimpleApp { def main(args: Array[String]){ val conf = new SparkConf().setAppName(Csv Clipper) val sc = new SparkContext(conf) val csvPath = hdfs://m01/user/acoronado/mov/movilidad_64mb.csv val csv = sc.textFile(csvPath) csv.cache() val clipPoints = csv.map({line: String = val Array(usuario, lat, lon, date) = line.split(,).map(_.trim) val punto = Point(lon.toDouble,lat.toDouble) val internal = geoDataExternal.get.find(f = f.geometry intersects punto) val (cve_est, cve_mun) = internal match { case Some(f:org.geoscript.feature.Feature) = { val index = f.getAttribute(1).toString() val existe = geoDataMun.get(index).find(f = f.geometry intersects punto) existe match { case Some(f) = (f.getAttribute(1).toString, f.getAttribute(2).toString) case None = (0, 0) } } case None = (0, 0) } val time = try {(new SimpleDateFormat(-MM-dd'T'HH:mm:ss.SSSZ)).parse(date.replaceAll(Z$, +)).getTime().toString()} catch {case e: Exception = 0} line+,+time+,+cve_est+,+cve_mun }) clipPoints.saveAsTextFile(hdfs://m01/user/acoronado/mov/resultados_movilidad_60.csv) println(Spark Clip Exito!!!) } object geoDataMun { private val shp = Shapefile(/geoData/MunicipiosLatLon.shp) val features = shp.getFeatures.toIterable val result = scala.io.Source.fromFile(/geoData/indice_espacial.csv) .getLines() .toList map { line: String = val campos = line.split(,).map(_.trim) val cve_edo = campos(0) val cve_mun = campos(1) val index = campos(2) scala.collection.immutable.List(index.toInt , (cve_edo,cve_mun)) } val mapaIx = result.groupBy(x=x(0)).mapValues(cves = cves.map(x = x(1))) def get(index:String) = { features.filter(f = mapaIx(index.toInt).contains((f.getAttribute(1).toString,f.getAttribute(2).toString))) } } object geoDataExternal{ private val shp = Shapefile(/geoData/IndiceRecortado.shp) val features = shp.getFeatures def get: FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature] = features } } the log of the driver is: 14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://sparkwor...@axaxaxa-cloudera-s05.xxxnetworks.com:44895] - [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error [Association failed with [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942 ] 14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://sparkwor...@axaxaxa-cloudera-s05.xxxnetworks.com:44895] - [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error [Association failed with [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [
Re: Kryo fails with avro having Arrays and unions, but succeeds with simple avro.
Thanks for the info frank. Twitter's-chill avro serializer looks great. But how does spark identifies it as serializer, as its not extending from KryoSerializer. (sorry scala is an alien lang for me). - Thanks Regards, Mohan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-fails-with-avro-having-Arrays-and-unions-but-succeeds-with-simple-avro-tp14549p14649.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Time difference between Python and Scala
Hello everyone, What should be the normal time difference between Scala and Python using Spark? I mean running the same program in the same cluster environment. In my case I am using numpy array structures for the Python code and vectors for the Scala code, both for handling my data. The time difference I have so far is Scala being around 6x times faster than Python...is it normal? Best regards
Powered By Spark
Hello! Could you please add us to your powered by page? Project name: Ubix.io Link: http://ubix.io Components: Spark, Shark, Spark SQL, MLib, GraphX, Spark Streaming, Adam project Description: blank for now
Re: Odd error when using a rdd map within a stream map
Hey, i don't think that's the issue, foreach is called on 'results' which is a DStream of floats, so naturally it passes RDDs to its function. And either way, changing the code in the first mapper to comment out the map reduce process on the RDD Float f = 1.0f; //nnRdd.map(new FunctionNeuralNet, Float() { // /** // * // */ // private static final long serialVersionUID = 876245667956566483L; // // @Override // public Float call(NeuralNet nn) throws Exception { // // return 1.0f; // } // }).reduce(new Function2Float, Float, Float() { // // /** // * // */ // private static final long serialVersionUID = 5461230777841578072L; // // @Override // public Float call(Float left, Float right) throws Exception { // // return left + right; // } // }); return Arrays.asList(f); works as expected, so it's most likely running that RDD.map().reduce() that's the issue somehow, i just don't get why it works when there's a .print() and the end and not a .foreach() -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Odd-error-when-using-a-rdd-map-within-a-stream-map-tp14551p14652.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
basic streaming question
Hello everybody, I'm new to spark streaming and played a bit around with WordCount and a PageRank-Algorithm in a cluster-environment. Am I right, that in the cluster each executor computes data stream separately? And that the result of each executor is independent of the other executors? In the non-streaming spark applications each action-operation merges the data from the executors and compute one result. or is this wrong? Is it possible in streaming-context to merge several streams like in a reduce and compute one result? greetz -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/basic-streaming-question-tp14653.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to find proto buffer class error with RDDprotobuf
Well it looks like this is indeed a protobuf issue. Poked a little more with Kryo. Since protobuf messages are serializable, I tried just making Kryo use the JavaSerializer for my messages. The resulting stack trace made it look like protobuf GeneratedMessageLite is actually using the classloader that loaded it, which I believe would be the root loader? * https://code.google.com/p/protobuf/source/browse/trunk/java/src/main/java/com/google/protobuf/GeneratedMessageLite.java?r=425#775 * http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l186 * http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/ClassLoader.java#l1529 * See note: http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l220 So I guess protobuf java serialization is sensitive to the class loader. I wonder if Kenton ever saw this one coming :) I do have a solution, though (see way below) Here's the code and stack trace: SparkConf sparkConf = new SparkConf(); sparkConf.setAppName(myapp); sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); sparkConf.set(spark.kryo.registrator, MyKryoRegistrator); ... public class MyKryoRegistrator implements KryoRegistrator { public void registerClasses(Kryo kryo) { kryo.register(MyProtoMessage.class, new JavaSerializer()); } } ... 14/09/19 05:39:12 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 2) com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:34) at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.RuntimeException: Unable to find proto buffer class at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at
Re: Unable to find proto buffer class error with RDDprotobuf
Derp, one caveat to my solution: I guess Spark doesn't use Kryo for Function serde :( On Fri, Sep 19, 2014 at 12:44 AM, Paul Wais pw...@yelp.com wrote: Well it looks like this is indeed a protobuf issue. Poked a little more with Kryo. Since protobuf messages are serializable, I tried just making Kryo use the JavaSerializer for my messages. The resulting stack trace made it look like protobuf GeneratedMessageLite is actually using the classloader that loaded it, which I believe would be the root loader? * https://code.google.com/p/protobuf/source/browse/trunk/java/src/main/java/com/google/protobuf/GeneratedMessageLite.java?r=425#775 * http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l186 * http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/ClassLoader.java#l1529 * See note: http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l220 So I guess protobuf java serialization is sensitive to the class loader. I wonder if Kenton ever saw this one coming :) I do have a solution, though (see way below) Here's the code and stack trace: SparkConf sparkConf = new SparkConf(); sparkConf.setAppName(myapp); sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); sparkConf.set(spark.kryo.registrator, MyKryoRegistrator); ... public class MyKryoRegistrator implements KryoRegistrator { public void registerClasses(Kryo kryo) { kryo.register(MyProtoMessage.class, new JavaSerializer()); } } ... 14/09/19 05:39:12 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 2) com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:34) at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.RuntimeException: Unable to find proto buffer class at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
rsync problem
Hi, I'd made some modifications to the spark source code in the master and reflected them to the slaves using rsync. I followed this command: rsync -avL --progress path/to/spark-1.0.0 username@destinationhostname :path/to/destdirectory. This worked perfectly. But, I wanted to simultaneously rsync all the slaves. So, added the other slaves as following: rsync -avL --progress path/to/spark-1.0.0 username@destinationhostname :path/to/destdirectory username@slave2:path username@slave3:path and so on. But this didn't work. Anyway, for now, I did it individually for each node. Can someone give me the right syntax. Secondly, after this rsync, I find that my cluster has become tremendously slow!!! Sometimes the cluster is just shutting down. Job execution is not happening. Can someone throw some light on this aspect. thank you Karthik
Re: rsync problem
Hi, On Fri, Sep 19, 2014 at 5:02 PM, rapelly kartheek kartheek.m...@gmail.com wrote: This worked perfectly. But, I wanted to simultaneously rsync all the slaves. So, added the other slaves as following: rsync -avL --progress path/to/spark-1.0.0 username@destinationhostname :path/to/destdirectory username@slave2:path username@slave3:path and so on. The rsync man page says rsync [OPTION...] SRC... [USER@]HOST:DEST so as I understand your command, you have copied a lot of files from various hosts to username@slave3:path. I don't think rsync can copy to various locations at once. Tobias
Re: rsync problem
Hi Tobias, I've copied the files from master to all the slaves. On Fri, Sep 19, 2014 at 1:37 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Sep 19, 2014 at 5:02 PM, rapelly kartheek kartheek.m...@gmail.com wrote: This worked perfectly. But, I wanted to simultaneously rsync all the slaves. So, added the other slaves as following: rsync -avL --progress path/to/spark-1.0.0 username@destinationhostname :path/to/destdirectory username@slave2:path username@slave3:path and so on. The rsync man page says rsync [OPTION...] SRC... [USER@]HOST:DEST so as I understand your command, you have copied a lot of files from various hosts to username@slave3:path. I don't think rsync can copy to various locations at once. Tobias
Re: rsync problem
, * you have copied a lot of files from various hosts to username@slave3:path* only from one node to all the other nodes... On Fri, Sep 19, 2014 at 1:45 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi Tobias, I've copied the files from master to all the slaves. On Fri, Sep 19, 2014 at 1:37 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Sep 19, 2014 at 5:02 PM, rapelly kartheek kartheek.m...@gmail.com wrote: This worked perfectly. But, I wanted to simultaneously rsync all the slaves. So, added the other slaves as following: rsync -avL --progress path/to/spark-1.0.0 username@destinationhostname :path/to/destdirectory username@slave2:path username@slave3:path and so on. The rsync man page says rsync [OPTION...] SRC... [USER@]HOST:DEST so as I understand your command, you have copied a lot of files from various hosts to username@slave3:path. I don't think rsync can copy to various locations at once. Tobias
Re: Spark Streaming and ReactiveMongo
Here's what we've tried so far as a first example of a custom Mongo receiver : /class MongoStreamReceiver(host: String) extends NetworkReceiver[String] { protected lazy val blocksGenerator: BlockGenerator = new BlockGenerator(StorageLevel.MEMORY_AND_DISK_SER_2) protected def onStart() = { blocksGenerator.start() val driver = new MongoDriver val connection = driver.connection(List(m01-pdp2)) val db = connection.db(local) val collection = db.collection[BSONCollection](oplog.rs) val query = BSONDocument(op - i) val enumerator = collection. find(query). options(QueryOpts().tailable.awaitData). cursor[BSONDocument]. enumerate() val processor: Iteratee[BSONDocument, Unit] = Iteratee.foreach { doc = blocksGenerator += BSONDocument.pretty(doc) } enumerator | processor } protected def onStop() { blocksGenerator.stop() } } / However this code doesn't run, probably because of serialization issues (no logs to confirm this though, just no data in the stream...) Note that if we comment out the ReactiveMongo-related code and put something like this instead, the code runs fine : /for (i - 0 until 1000) { blocksGenerator += hello world Thread.sleep(1000) } / The Java socket example (found here http://spark.apache.org/docs/0.9.1/streaming-custom-receivers.html ) works fine as well. Any hints ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-ReactiveMongo-tp14568p14661.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: rsync problem
-- Forwarded message -- From: rapelly kartheek kartheek.m...@gmail.com Date: Fri, Sep 19, 2014 at 1:51 PM Subject: Re: rsync problem To: Tobias Pfeiffer t...@preferred.jp any idea why the cluster is dying down??? On Fri, Sep 19, 2014 at 1:47 PM, rapelly kartheek kartheek.m...@gmail.com wrote: , * you have copied a lot of files from various hosts to username@slave3:path* only from one node to all the other nodes... On Fri, Sep 19, 2014 at 1:45 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi Tobias, I've copied the files from master to all the slaves. On Fri, Sep 19, 2014 at 1:37 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Sep 19, 2014 at 5:02 PM, rapelly kartheek kartheek.m...@gmail.com wrote: This worked perfectly. But, I wanted to simultaneously rsync all the slaves. So, added the other slaves as following: rsync -avL --progress path/to/spark-1.0.0 username@destinationhostname :path/to/destdirectory username@slave2:path username@slave3:path and so on. The rsync man page says rsync [OPTION...] SRC... [USER@]HOST:DEST so as I understand your command, you have copied a lot of files from various hosts to username@slave3:path. I don't think rsync can copy to various locations at once. Tobias
Re: Spark + Mahout
No, it is actually a quite different 'alpha' project under the same name: linear algebra DSL on top of H2O and also Spark. It is not really about algorithm implementations now. On Sep 19, 2014 1:25 AM, Matthew Farrellee m...@redhat.com wrote: On 09/18/2014 05:40 PM, Sean Owen wrote: No, the architectures are entirely different. The Mahout implementations have been deprecated and are not being updated, so there won't be a port or anything. You would have to create these things from scratch on Spark if they don't already exist. On Sep 18, 2014 7:50 PM, Daniel Takabayashi takabaya...@scanboo.com.br mailto:takabaya...@scanboo.com.br wrote: Hi guys, Is possible to run a mahout kmeans throws spark infrastructure? Thanks, taka (Brazil) from what i've read, mahout isn't accepting changes to MR-based implementations. would mahout accept an implementation on Spark? best, matt
Re: PairRDD's lookup method Performance
The product of each mapPartitions call can be an Iterable of one big Map. You still need to write some extra custom code like what lookup() does to exploit this data structure. On Sep 18, 2014 11:07 PM, Harsha HN 99harsha.h@gmail.com wrote: Hi All, My question is related to improving performance of pairRDD's lookup method. I went through below link where Tathagata Das http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=user_nodesuser=46 explains creating Hash Map over Partitions using mappartition method to get search performance of O(1). http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-RDD-over-hashmap-td893.html How can this be done in Java? HashMap is not a supported return type for any overloaded version of mappartition methods. Thanks and Regards, Harsha
Bulk-load to HBase
Hi, Is there a way to bulk-load to HBase from RDD? HBase offers HFileOutputFormat class for bulk loading by MapReduce job, but I cannot figure out how to use it with saveAsHadoopDataset. Thanks.
RE: Bulk-load to HBase
Hi, After reading several documents, it seems that saveAsHadoopDataset cannot use HFileOutputFormat. It's because saveAsHadoopDataset method uses JobConf, so it belongs to the old Hadoop API, while HFileOutputFormat is a member of mapreduce package which is for the new Hadoop API. Am I right? If so, is there another method to bulk-load to HBase from RDD? Thanks. From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Friday, September 19, 2014 7:17 PM To: user@spark.apache.org Subject: Bulk-load to HBase Hi, Is there a way to bulk-load to HBase from RDD? HBase offers HFileOutputFormat class for bulk loading by MapReduce job, but I cannot figure out how to use it with saveAsHadoopDataset. Thanks.
RE: Bulk-load to HBase
Hi, Sorry, I just found saveAsNewAPIHadoopDataset. Then, Can I use HFileOutputFormat with saveAsNewAPIHadoopDataset? Is there any example code for that? Thanks. From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Friday, September 19, 2014 8:18 PM To: user@spark.apache.org Subject: RE: Bulk-load to HBase Hi, After reading several documents, it seems that saveAsHadoopDataset cannot use HFileOutputFormat. It's because saveAsHadoopDataset method uses JobConf, so it belongs to the old Hadoop API, while HFileOutputFormat is a member of mapreduce package which is for the new Hadoop API. Am I right? If so, is there another method to bulk-load to HBase from RDD? Thanks. From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Friday, September 19, 2014 7:17 PM To: user@spark.apache.org Subject: Bulk-load to HBase Hi, Is there a way to bulk-load to HBase from RDD? HBase offers HFileOutputFormat class for bulk loading by MapReduce job, but I cannot figure out how to use it with saveAsHadoopDataset. Thanks.
Re: Spark + Mahout
On 09/19/2014 05:06 AM, Sean Owen wrote: No, it is actually a quite different 'alpha' project under the same name: linear algebra DSL on top of H2O and also Spark. It is not really about algorithm implementations now. On Sep 19, 2014 1:25 AM, Matthew Farrellee m...@redhat.com mailto:m...@redhat.com wrote: On 09/18/2014 05:40 PM, Sean Owen wrote: No, the architectures are entirely different. The Mahout implementations have been deprecated and are not being updated, so there won't be a port or anything. You would have to create these things from scratch on Spark if they don't already exist. On Sep 18, 2014 7:50 PM, Daniel Takabayashi takabaya...@scanboo.com.br mailto:takabaya...@scanboo.com.br mailto:takabayashi@scanboo.__com.br mailto:takabaya...@scanboo.com.br wrote: Hi guys, Is possible to run a mahout kmeans throws spark infrastructure? Thanks, taka (Brazil) from what i've read, mahout isn't accepting changes to MR-based implementations. would mahout accept an implementation on Spark? best, matt oic. where's a good place to see progress on that? best, matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Bulk-load to HBase
Thank you for the example code. Currently I use foreachPartition() + Put(), but your example code can be used to clean up my code. BTW, since the data uploaded by Put() goes through normal HBase write path, it can be slow. So, it would be nice if bulk-load could be used, since it bypasses the write path. Thanks. From: Aniket Bhatnagar [mailto:aniket.bhatna...@gmail.com] Sent: Friday, September 19, 2014 9:01 PM To: innowireless TaeYun Kim Cc: user Subject: Re: Bulk-load to HBase I have been using saveAsNewAPIHadoopDataset but I use TableOutputFormat instead of HFileOutputFormat. But, hopefully this should help you: val hbaseZookeeperQuorum = s$zookeeperHost:$zookeeperPort:$zookeeperHbasePath val conf = HBaseConfiguration.create() conf.set(hbase.zookeeper.quorum, hbaseZookeeperQuorum) conf.set(TableOutputFormat.QUORUM_ADDRESS, hbaseZookeeperQuorum) conf.set(TableOutputFormat.QUORUM_PORT, zookeeperPort.toString) conf.setClass(mapreduce.outputformat.class, classOf[TableOutputFormat[Object]], classOf[OutputFormat[Object, Writable]]) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val rddToSave: RDD[(Array[Byte], Array[Byte], Array[Byte])] = ... // Some RDD that contains row key, column qualifier and data val putRDD = rddToSave.map(tuple = { val (rowKey, column data) = tuple val put: Put = new Put(rowKey) put.add(COLUMN_FAMILY_RAW_DATA_BYTES, column, data) (new ImmutableBytesWritable(rowKey), put) }) putRDD.saveAsNewAPIHadoopDataset(conf) On 19 September 2014 16:52, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Hi, Sorry, I just found saveAsNewAPIHadoopDataset. Then, Can I use HFileOutputFormat with saveAsNewAPIHadoopDataset? Is there any example code for that? Thanks. From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Friday, September 19, 2014 8:18 PM To: user@spark.apache.org Subject: RE: Bulk-load to HBase Hi, After reading several documents, it seems that saveAsHadoopDataset cannot use HFileOutputFormat. It’s because saveAsHadoopDataset method uses JobConf, so it belongs to the old Hadoop API, while HFileOutputFormat is a member of mapreduce package which is for the new Hadoop API. Am I right? If so, is there another method to bulk-load to HBase from RDD? Thanks. From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Friday, September 19, 2014 7:17 PM To: user@spark.apache.org Subject: Bulk-load to HBase Hi, Is there a way to bulk-load to HBase from RDD? HBase offers HFileOutputFormat class for bulk loading by MapReduce job, but I cannot figure out how to use it with saveAsHadoopDataset. Thanks.
RE: Bulk-load to HBase
In fact, it seems that Put can be used by HFileOutputFormat, so Put object itself may not be the problem. The problem is that TableOutputFormat uses the Put object in the normal way (that goes through normal write path), while HFileOutFormat uses it to directly build the HFile. From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Friday, September 19, 2014 9:20 PM To: user@spark.apache.org Subject: RE: Bulk-load to HBase Thank you for the example code. Currently I use foreachPartition() + Put(), but your example code can be used to clean up my code. BTW, since the data uploaded by Put() goes through normal HBase write path, it can be slow. So, it would be nice if bulk-load could be used, since it bypasses the write path. Thanks. From: Aniket Bhatnagar [mailto:aniket.bhatna...@gmail.com] Sent: Friday, September 19, 2014 9:01 PM To: innowireless TaeYun Kim Cc: user Subject: Re: Bulk-load to HBase I have been using saveAsNewAPIHadoopDataset but I use TableOutputFormat instead of HFileOutputFormat. But, hopefully this should help you: val hbaseZookeeperQuorum = s$zookeeperHost:$zookeeperPort:$zookeeperHbasePath val conf = HBaseConfiguration.create() conf.set(hbase.zookeeper.quorum, hbaseZookeeperQuorum) conf.set(TableOutputFormat.QUORUM_ADDRESS, hbaseZookeeperQuorum) conf.set(TableOutputFormat.QUORUM_PORT, zookeeperPort.toString) conf.setClass(mapreduce.outputformat.class, classOf[TableOutputFormat[Object]], classOf[OutputFormat[Object, Writable]]) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val rddToSave: RDD[(Array[Byte], Array[Byte], Array[Byte])] = ... // Some RDD that contains row key, column qualifier and data val putRDD = rddToSave.map(tuple = { val (rowKey, column data) = tuple val put: Put = new Put(rowKey) put.add(COLUMN_FAMILY_RAW_DATA_BYTES, column, data) (new ImmutableBytesWritable(rowKey), put) }) putRDD.saveAsNewAPIHadoopDataset(conf) On 19 September 2014 16:52, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Hi, Sorry, I just found saveAsNewAPIHadoopDataset. Then, Can I use HFileOutputFormat with saveAsNewAPIHadoopDataset? Is there any example code for that? Thanks. From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Friday, September 19, 2014 8:18 PM To: user@spark.apache.org Subject: RE: Bulk-load to HBase Hi, After reading several documents, it seems that saveAsHadoopDataset cannot use HFileOutputFormat. It’s because saveAsHadoopDataset method uses JobConf, so it belongs to the old Hadoop API, while HFileOutputFormat is a member of mapreduce package which is for the new Hadoop API. Am I right? If so, is there another method to bulk-load to HBase from RDD? Thanks. From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Friday, September 19, 2014 7:17 PM To: user@spark.apache.org Subject: Bulk-load to HBase Hi, Is there a way to bulk-load to HBase from RDD? HBase offers HFileOutputFormat class for bulk loading by MapReduce job, but I cannot figure out how to use it with saveAsHadoopDataset. Thanks.
Re: Spark streaming stops computing while the receiver keeps running without any errors reported
Apologies in delay in getting back on this. It seems the Kinesis example does not run on Spark 1.1.0 even when it is built using kinesis-acl profile because of a dependency conflict in http client (same issue as http://mail-archives.apache.org/mod_mbox/spark-dev/201409.mbox/%3ccajob8btdxks-7-spjj5jmnw0xsnrjwdpcqqtjht1hun6j4z...@mail.gmail.com%3E). I had to add a later version of http client in kinesis-acl profile to make it run. Then, the Kinesis example sets master as local so it does not honour the MASTER environment variable as other examples do. Once I was able to resolve these issues, I was finally able to reproduce the issue. The example works fine in local mode but does not do anything when receiver runs in remote workers. Spark streaming does not report any blocks received from the receivers even though I can see the following lines in the app logs (I modified the debug line to print size as well): 14/09/19 12:30:18 DEBUG ReceiverSupervisorImpl: Pushed block input-0-1411129664668 in 15 ms 14/09/19 12:30:18 DEBUG ReceiverSupervisorImpl: Reported block input-0-1411129664668 of size 1 Here are the screenshots of Spark admin: http://imgur.com/a/gWKYm I also ran other examples (custom receiver, etc) in both local and distributed mode and they seem to be working fine. Any ideas? Thanks, Aniket On 12 September 2014 02:49, Tathagata Das tathagata.das1...@gmail.com wrote: This is very puzzling, given that this works in the local mode. Does running the kinesis example work with your spark-submit? https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala The instructions are present in the streaming guide. https://github.com/apache/spark/blob/master/docs/streaming-kinesis-integration.md If that does not work on cluster, then I would see the streaming UI for the number records that are being received, and the stages page for whether jobs are being executed for every batch or not. Can tell use whether that is working well. Also ccing, chris fregly who wrote Kinesis integration. TD On Thu, Sep 11, 2014 at 4:51 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Hi all I am trying to run kinesis spark streaming application on a standalone spark cluster. The job works find in local mode but when I submit it (using spark-submit), it doesn't do anything. I enabled logs for org.apache.spark.streaming.kinesis package and I regularly get the following in worker logs: 14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored: Worker x.x.x.x:b88a9210-cbb9-4c31-8da7-35fd92faba09 stored 34 records for shardId shardId- 14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored: Worker x.x.x.x:b2e9c20f-470a-44fe-a036-630c776919fb stored 31 records for shardId shardId-0001 But the job does not perform any operations defined on DStream. To investigate this further, I changed the kinesis-asl's KinesisUtils to perform the following computation on the DStream created using ssc.receiverStream(new KinesisReceiver...): stream.count().foreachRDD(rdd = rdd.foreach(tuple = logInfo(Emitted + tuple))) Even the above line does not results in any corresponding log entries both in driver and worker logs. The only relevant logs that I could find in driver logs are: 14/09/11 11:40:58 INFO DAGScheduler: Stage 2 (foreach at KinesisUtils.scala:68) finished in 0.398 s 14/09/11 11:40:58 INFO SparkContext: Job finished: foreach at KinesisUtils.scala:68, took 4.926449985 s 14/09/11 11:40:58 INFO JobScheduler: Finished job streaming job 1410435653000 ms.0 from job set of time 1410435653000 ms 14/09/11 11:40:58 INFO JobScheduler: Starting job streaming job 1410435653000 ms.1 from job set of time 1410435653000 ms 14/09/11 11:40:58 INFO SparkContext: Starting job: foreach at KinesisUtils.scala:68 14/09/11 11:40:58 INFO DAGScheduler: Registering RDD 13 (union at DStream.scala:489) 14/09/11 11:40:58 INFO DAGScheduler: Got job 3 (foreach at KinesisUtils.scala:68) with 2 output partitions (allowLocal=false) 14/09/11 11:40:58 INFO DAGScheduler: Final stage: Stage 5(foreach at KinesisUtils.scala:68) After the above logs, nothing shows up corresponding to KinesisUtils. I am out of ideas on this one and any help on this would greatly appreciated. Thanks, Aniket
Re: Bulk-load to HBase
Agreed that the bulk import would be faster. In my case, I wasn't expecting a lot of data to be uploaded to HBase and also, I didn't want to take the pain of importing generated HFiles into HBase. Is there a way to invoke HBase HFile import batch script programmatically? On 19 September 2014 17:58, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: In fact, it seems that Put can be used by HFileOutputFormat, so Put object itself may not be the problem. The problem is that TableOutputFormat uses the Put object in the normal way (that goes through normal write path), while HFileOutFormat uses it to directly build the HFile. *From:* innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] *Sent:* Friday, September 19, 2014 9:20 PM *To:* user@spark.apache.org *Subject:* RE: Bulk-load to HBase Thank you for the example code. Currently I use foreachPartition() + Put(), but your example code can be used to clean up my code. BTW, since the data uploaded by Put() goes through normal HBase write path, it can be slow. So, it would be nice if bulk-load could be used, since it bypasses the write path. Thanks. *From:* Aniket Bhatnagar [mailto:aniket.bhatna...@gmail.com aniket.bhatna...@gmail.com] *Sent:* Friday, September 19, 2014 9:01 PM *To:* innowireless TaeYun Kim *Cc:* user *Subject:* Re: Bulk-load to HBase I have been using saveAsNewAPIHadoopDataset but I use TableOutputFormat instead of HFileOutputFormat. But, hopefully this should help you: val hbaseZookeeperQuorum = s$zookeeperHost:$zookeeperPort:$zookeeperHbasePath val conf = HBaseConfiguration.create() conf.set(hbase.zookeeper.quorum, hbaseZookeeperQuorum) conf.set(TableOutputFormat.QUORUM_ADDRESS, hbaseZookeeperQuorum) conf.set(TableOutputFormat.QUORUM_PORT, zookeeperPort.toString) conf.setClass(mapreduce.outputformat.class, classOf[TableOutputFormat[Object]], classOf[OutputFormat[Object, Writable]]) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val rddToSave: RDD[(Array[Byte], Array[Byte], Array[Byte])] = ... // Some RDD that contains row key, column qualifier and data val putRDD = rddToSave.map(tuple = { val (rowKey, column data) = tuple val put: Put = new Put(rowKey) put.add(COLUMN_FAMILY_RAW_DATA_BYTES, column, data) (new ImmutableBytesWritable(rowKey), put) }) putRDD.saveAsNewAPIHadoopDataset(conf) On 19 September 2014 16:52, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Hi, Sorry, I just found saveAsNewAPIHadoopDataset. Then, Can I use HFileOutputFormat with saveAsNewAPIHadoopDataset? Is there any example code for that? Thanks. *From:* innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] *Sent:* Friday, September 19, 2014 8:18 PM *To:* user@spark.apache.org *Subject:* RE: Bulk-load to HBase Hi, After reading several documents, it seems that saveAsHadoopDataset cannot use HFileOutputFormat. It’s because saveAsHadoopDataset method uses JobConf, so it belongs to the old Hadoop API, while HFileOutputFormat is a member of mapreduce package which is for the new Hadoop API. Am I right? If so, is there another method to bulk-load to HBase from RDD? Thanks. *From:* innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr taeyun@innowireless.co.kr] *Sent:* Friday, September 19, 2014 7:17 PM *To:* user@spark.apache.org *Subject:* Bulk-load to HBase Hi, Is there a way to bulk-load to HBase from RDD? HBase offers HFileOutputFormat class for bulk loading by MapReduce job, but I cannot figure out how to use it with saveAsHadoopDataset. Thanks.
Re: Kryo fails with avro having Arrays and unions, but succeeds with simple avro.
Hi Mohan, It’s a bit convoluted to follow in their source, but they essentially typedef KSerializer as being a KryoSerializer, and then their serializers all extend KSerializer. Spark should identify them properly as Kryo Serializers, but I haven’t tried it myself. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Sep 19, 2014, at 12:03 AM, mohan.gadm mohan.g...@gmail.com wrote: Thanks for the info frank. Twitter's-chill avro serializer looks great. But how does spark identifies it as serializer, as its not extending from KryoSerializer. (sorry scala is an alien lang for me). - Thanks Regards, Mohan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-fails-with-avro-having-Arrays-and-unions-but-succeeds-with-simple-avro-tp14549p14649.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: New API for TFIDF generation in Spark 1.1.0
Jatin, If you file the JIRA and don't want to work on it, I'd be happy to step in and take a stab at it. RJ On Thu, Sep 18, 2014 at 4:08 PM, Xiangrui Meng men...@gmail.com wrote: Hi Jatin, HashingTF should be able to solve the memory problem if you use a small feature dimension in HashingTF. Please do not cache the input document, but cache the output from HashingTF and IDF instead. We don't have a label indexer yet, so you need a label to index map to map it to double values, e.g., D1 - 0.0, D2 - 1.0, etc. Assuming that the input is an RDD[(label: String, doc: Seq[String])], the code should look like the following: val docTypeToLabel = Map(D1 - 0.0, ...) val tf = new HashingTF(); val freqs = input.map(x = (docTypeToLabel(x._1), tf.transform(x._2))).cache() val idf = new IDF() val idfModel = idf.fit(freqs.values) val vectors = freqs.map(x = LabeledPoint(x._1, idfModel.transform(x._2))) val nbModel = NaiveBayes.train(vectors) IDF doesn't provide the filter on the min occurrence, but it is nice to put that option. Please create a JIRA and someone may work on it. Best, Xiangrui On Thu, Sep 18, 2014 at 3:46 AM, jatinpreet jatinpr...@gmail.com wrote: Hi, I have been running into memory overflow issues while creating TFIDF vectors to be used in document classification using MLlib's Naive Baye's classification implementation. http://chimpler.wordpress.com/2014/06/11/classifiying-documents-using-naive-bayes-on-apache-spark-mllib/ Memory overflow and GC issues occur while collecting idfs for all the terms. To give an idea of scale, I am reading around 615,000(around 4GB of text data) small sized documents from HBase and running the spark program with 8 cores and 6GB of executor memory. I have tried increasing the parallelism level and shuffle memory fraction but to no avail. The new TFIDF generation APIs caught my eye in the latest Spark version 1.1.0. The example given in the official documentation mentions creation of TFIDF vectors based of Hashing Trick. I want to know if it will solve the mentioned problem by benefiting from reduced memory consumption. Also, the example does not state how to create labeled points for a corpus of pre-classified document data. For example, my training input looks something like this, DocumentType | Content - D1 | This is Doc1 sample. D1 | This also belongs to Doc1. D1 | Yet another Doc1 sample. D2 | Doc2 sample. D2 | Sample content for Doc2. D3 | The only sample for Doc3. D4 | Doc4 sample looks like this. D4 | This is Doc4 sample content. I want to create labeled points from this sample data for training. And once the Naive Bayes model is created, I generate TFIDFs for the test documents and predict the document type. If the new API can solve my issue, how can I generate labelled points using the new APIs? An example would be great. Also, I have a special requirement of ignoring terms that occur in less than two documents. This has important implications for the accuracy of my use case and needs to be accommodated while generating TFIDFs. Thanks, Jatin - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/New-API-for-TFIDF-generation-in-Spark-1-1-0-tp14543.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- em rnowl...@gmail.com c 954.496.2314
Anyone have successful recipe for spark cassandra connector?
I'm running out of options trying to integrate cassandra, spark, and the spark-cassandra-connector. I quickly found out just grabbing the latest versions of everything (drivers, etc.) doesn't work--binary incompatibilities it would seem. So last I tried using versions of drivers from the spark-cassandra-connector's build. Better, but still no dice. Any successes out there? I'd really love to use the stack. If curious my ridiculously trivial example is here: https://github.com/gzoller/doesntwork https://github.com/gzoller/doesntwork If you run 'sbt test' you'll get a NoHostAvailableException exception complaining it tried /10.0.0.194:9042. I have no idea where that addr came from. I was trying to connect to local. Any ideas appreciated! Greg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-have-successful-recipe-for-spark-cassandra-connector-tp14681.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Support R in Spark
Thanks, Shivaram. Kui On Sep 19, 2014, at 12:58 AM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: As R is single-threaded, SparkR launches one R process per-executor on the worker side. Thanks Shivaram On Thu, Sep 18, 2014 at 7:49 AM, oppokui oppo...@gmail.com wrote: Shivaram, As I know, SparkR used rJava package. In work node, spark code will execute R code by launching R process and send/receive byte array. I have a question on when to launch R process. R process is per Work process, or per executor thread, or per each RDD processing? Thanks and Regards. Kui On Sep 6, 2014, at 5:53 PM, oppokui oppo...@gmail.com wrote: Cool! It is a very good news. Can’t wait for it. Kui On Sep 5, 2014, at 1:58 AM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: Thanks Kui. SparkR is a pretty young project, but there are a bunch of things we are working on. One of the main features is to expose a data frame API (https://sparkr.atlassian.net/browse/SPARKR-1) and we will be integrating this with Spark's MLLib. At a high-level this will allow R users to use a familiar API but make use of MLLib's efficient distributed implementation. This is the same strategy used in Python as well. Also we do hope to merge SparkR with mainline Spark -- we have a few features to complete before that and plan to shoot for integration by Spark 1.3. Thanks Shivaram On Wed, Sep 3, 2014 at 9:24 PM, oppokui oppo...@gmail.com wrote: Thanks, Shivaram. No specific use case yet. We try to use R in our project as data scientest are all knowing R. We had a concern that how R handles the mass data. Spark does a better work on big data area, and Spark ML is focusing on predictive analysis area. Then we are thinking whether we can merge R and Spark together. We tried SparkR and it is pretty easy to use. But we didn’t see any feedback on this package in industry. It will be better if Spark team has R support just like scala/Java/Python. Another question is that MLlib will re-implement all famous data mining algorithms in Spark, then what is the purpose of using R? There is another technique for us H2O which support R natively. H2O is more friendly to data scientist. I saw H2O can also work on Spark (Sparkling Water). It is better than using SparkR? Thanks and Regards. Kui On Sep 4, 2014, at 1:47 AM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: Hi Do you have a specific use-case where SparkR doesn't work well ? We'd love to hear more about use-cases and features that can be improved with SparkR. Thanks Shivaram On Wed, Sep 3, 2014 at 3:19 AM, oppokui oppo...@gmail.com wrote: Does spark ML team have plan to support R script natively? There is a SparkR project, but not from spark team. Spark ML used netlib-java to talk with native fortran routines or use NumPy, why not try to use R in some sense. R had lot of useful packages. If spark ML team can include R support, it will be a very powerful. Any comment? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming and ReactiveMongo
onStart should be non-blocking. You may try to create a thread in onStart instead. - Original Message - From: t1ny wbr...@gmail.com To: u...@spark.incubator.apache.org Sent: Friday, September 19, 2014 1:26:42 AM Subject: Re: Spark Streaming and ReactiveMongo Here's what we've tried so far as a first example of a custom Mongo receiver : /class MongoStreamReceiver(host: String) extends NetworkReceiver[String] { protected lazy val blocksGenerator: BlockGenerator = new BlockGenerator(StorageLevel.MEMORY_AND_DISK_SER_2) protected def onStart() = { blocksGenerator.start() val driver = new MongoDriver val connection = driver.connection(List(m01-pdp2)) val db = connection.db(local) val collection = db.collection[BSONCollection](oplog.rs) val query = BSONDocument(op - i) val enumerator = collection. find(query). options(QueryOpts().tailable.awaitData). cursor[BSONDocument]. enumerate() val processor: Iteratee[BSONDocument, Unit] = Iteratee.foreach { doc = blocksGenerator += BSONDocument.pretty(doc) } enumerator | processor } protected def onStop() { blocksGenerator.stop() } } / However this code doesn't run, probably because of serialization issues (no logs to confirm this though, just no data in the stream...) Note that if we comment out the ReactiveMongo-related code and put something like this instead, the code runs fine : /for (i - 0 until 1000) { blocksGenerator += hello world Thread.sleep(1000) } / The Java socket example (found here http://spark.apache.org/docs/0.9.1/streaming-custom-receivers.html ) works fine as well. Any hints ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-ReactiveMongo-tp14568p14661.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: rsync problem
Thank you Soumya Simantha and Tobias. I've deleted the contents of the work folder in all the nodes. Now its working perfectly as it was before. Thank you Karthik On Fri, Sep 19, 2014 at 4:46 PM, Soumya Simanta soumya.sima...@gmail.com wrote: One possible reason is maybe that the checkpointing directory $SPARK_HOME/work is rsynced as well. Try emptying the contents of the work folder on each node and try again. On Fri, Sep 19, 2014 at 4:53 AM, rapelly kartheek kartheek.m...@gmail.com wrote: I * followed this command:rsync -avL --progress path/to/spark-1.0.0 username@destinationhostname:* *path/to/destdirectory. Anyway, for now, I did it individually for each node.* I have copied to each node at a time individually using the above command. So, I guess the copying may not contain any mixture of files. Also, as of now, I am not facing any MethodNotFound exceptions. But, there is no job execution taking place. After sometime, one by one, each goes down and the cluster shuts down. On Fri, Sep 19, 2014 at 2:15 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Sep 19, 2014 at 5:17 PM, rapelly kartheek kartheek.m...@gmail.com wrote: , * you have copied a lot of files from various hosts to username@slave3:path* only from one node to all the other nodes... I don't think rsync can do that in one command as you described. My guess is that now you have a wild mixture of jar files all across your cluster which will lead to fancy exceptions like MethodNotFound etc., that's maybe why your cluster is not working correctly. Tobias
Re: Cannot run SimpleApp as regular Java app
It turns out that it was the Hadoop version that was the issue. spark-1.0.2-hadoop1 and spark-1.1.0-hadoop1 both work. spark.1.0.2-hadoop2, spark-1.1.0-hadoop2.4 and spark-1.1.0-hadoop2.4 do not work. It's strange because for this little test I am not even using HDFS at all. -- Eric On Thu, Sep 18, 2014 at 11:58 AM, ericacm [via Apache Spark User List] ml-node+s1001560n14570...@n3.nabble.com wrote: Upgrading from spark-1.0.2-hadoop2 to spark-1.1.0-hadoop1 fixed my problem. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-run-SimpleApp-as-regular-Java-app-tp13695p14570.html To unsubscribe from Cannot run SimpleApp as regular Java app, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=13695code=ZXJpY2FjbUBnbWFpbC5jb218MTM2OTV8MTY0ODE0NDgzOQ== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-run-SimpleApp-as-regular-Java-app-tp13695p14685.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: paging through an RDD that's too large to collect() all at once
Excellent - thats exactly what I needed. I saw iterator() but missed the toLocalIterator() method -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/paging-through-an-RDD-that-s-too-large-to-collect-all-at-once-tp14638p14686.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
return probability \ confidence instead of actual class
Hi, I am working with the SVMWithSGD classification algorithm on Spark. It works fine for me, however, I would like to recognize the instances that are classified with a high confidence from those with a low one. How do we define the threshold here? Ultimately, I want to keep only those for which the algorithm is very *very* certain about its its decision! How to do that? Is this feature supported already by any MLlib algorithm? What if I had multiple categories? Any input is highly appreciated!
Spark 1.1.0 (w/ hadoop 2.4) versus aws-java-sdk-1.7.2.jar
Hi, Spark experts, I have the following issue when using aws java sdk in my spark application. Here I narrowed down the following steps to reproduce the problem 1) I have Spark 1.1.0 with hadoop 2.4 installed on 3 nodes cluster 2) from the master node, I did the following steps. spark-shell --jars ws-java-sdk-1.7.2.jar import com.amazonaws.{Protocol, ClientConfiguration} import com.amazonaws.auth.BasicAWSCredentials import com.amazonaws.services.s3.AmazonS3Client val clientConfiguration = new ClientConfiguration() val s3accessKey=X val s3secretKey=Y val credentials = new BasicAWSCredentials(s3accessKey,s3secretKey) println(CLASSPATH=+System.getenv(CLASSPATH)) CLASSPATH=::/home/hadoop/spark/conf:/home/hadoop/spark/lib/spark-assembly-1.1.0-hadoop2.4.0.jar:/home/hadoop/conf:/home/hadoop/conf println(java.class.path=+System.getProperty(java.class.path)) java.class.path=::/home/hadoop/spark/conf:/home/hadoop/spark/lib/spark-assembly-1.1.0-hadoop2.4.0.jar:/home/hadoop/conf:/home/hadoop/conf So far all look good and normal. But then the following step will fail and it looks like the class loader can't resolve to the right class. Any suggestion for Spark application that requires aws sdk? scala val s3Client = new AmazonS3Client(credentials, clientConfiguration) java.lang.NoClassDefFoundError: org/apache/http/impl/conn/PoolingClientConnectionManager at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26) at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:155) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:119) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:103) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:334) at $iwC$$iwC$$iwC$$iwC.init(console:21) at $iwC$$iwC$$iwC.init(console:26) at $iwC$$iwC.init(console:28) at $iwC.init(console:30) at init(console:32) at .init(console:36) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.http.impl.conn.PoolingClientConnectionManager at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 46 more Thanks. Tian
Re: spark-submit command-line with --files
Hi Chinchu, SparkEnv is an internal class that is only meant to be used within Spark. Outside of Spark, it will be null because there are no executors or driver to start an environment for. Similarly, SparkFiles is meant to be used internally (though it's privacy settings should be modified to reflect that). Is there a reason why you need to pass the serialized objects this way? Can't you access the deserialized form from your application? Andrew 2014-09-18 22:53 GMT-07:00 chinchu chinchu@gmail.com: Hi, I am running spark-1.1.0 and I want to pass in a file (that contains java serialized objects used to initialize my program) to the App main program. I am using the --files option but I am not able to retrieve the file in the main_class. It reports a null pointer exception. [I tried both local yarn-cluster with the same result]. I am using the SparkFiles.get(myobject.ser) to get the file. Am I doing something wrong ? CMD: bin/spark-submit --name Test --class com.test.batch.modeltrainer.ModelTrainerMain \ --master local --files /tmp/myobject.ser --verbose /opt/test/lib/spark-test.jar com.test.batch.modeltrainer.ModelTrainerMain.scala 37: val serFile = SparkFiles.get(myobject.ser) Exception: Exception in thread main java.lang.NullPointerException at org.apache.spark.SparkFiles$.getRootDirectory(SparkFiles.scala:37) at org.apache.spark.SparkFiles$.get(SparkFiles.scala:31) at com.test.batch.modeltrainer.ModelTrainerMain$.main(ModelTrainerMain.scala:37) at com.test.batch.modeltrainer.ModelTrainerMain.main(ModelTrainerMain.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Looking at the Scala code for SparkFiles:37, it looks like SparkEnv.get is getting a null .. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-command-line-with-files-tp14645.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Time difference between Python and Scala
I think it's normal. On Fri, Sep 19, 2014 at 12:07 AM, Luis Guerra luispelay...@gmail.com wrote: Hello everyone, What should be the normal time difference between Scala and Python using Spark? I mean running the same program in the same cluster environment. In my case I am using numpy array structures for the Python code and vectors for the Scala code, both for handling my data. The time difference I have so far is Scala being around 6x times faster than Python...is it normal? Best regards - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDD pipe example. Is this a bug or a feature?
Hi I am wrote a little java job to try and figure out how RDD pipe works. Bellow is my test shell script. If in the script I turn on debugging I get output. In my console. If debugging is turned off in the shell script, I do not see anything in my console. Is this a bug or feature? I am running the job locally on a Mac Thanks Andy Here is my Java rdd.pipe(pwd + /src/main/bin/RDDPipe.sh).collect(); #!/bin/sh # # Use this shell script to figure out how spark RDD pipe() works # set -x # turns shell debugging on #set +x # turns shell debugging off while read x ; do echo RDDPipe.sh $x ; Done Here is the output if debugging is turned on $ !grep grep RDDPipe run.sh.out + echo RDDPipe.sh 0 + echo RDDPipe.sh 0 + echo RDDPipe.sh 2 + echo RDDPipe.sh 0 + echo RDDPipe.sh 3 + echo RDDPipe.sh 0 + echo RDDPipe.sh 0 $
mllib performance on mesos cluster
Hi, I have a program similar to the BinaryClassifier example that I am running using my data (which is fairly small). I run this for 100 iterations. I observed the following performance: Standalone mode cluster with 10 nodes (with Spark 1.0.2): 5 minutes Standalone mode cluster with 10 nodes (with Spark 1.1.0): 8.9 minutes Mesos cluster with 10 nodes (with Spark 1.1.0): 17 minutes 1) According to the documentation, Spark 1.1.0 has better performance. So I would like to understand why the runtime is longer on Spark 1.1.0. 2) Why is the performance on mesos significantly higher than in standalone mode? I just wanted to find out if any one else has observed poor performance for Mllib based programs on mesos cluster. I looked through the application detail logs and found that some of the scheduler delay values were higher on mesos compared to standalone mode (40 ms vs. 10 ms). Is the mesos scheduler slower? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-mesos-cluster-tp14692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD pipe example. Is this a bug or a feature?
What is in 'rdd' here, to double check? Do you mean the spark shell when you say console? At the end you're grepping output from some redirected output but where is that from? On Sep 19, 2014 7:21 PM, Andy Davidson a...@santacruzintegration.com wrote: Hi I am wrote a little java job to try and figure out how RDD pipe works. Bellow is my test shell script. If in the script I turn on debugging I get output. In my console. If debugging is turned off in the shell script, I do not see anything in my console. Is this a bug or feature? I am running the job locally on a Mac Thanks Andy Here is my Java rdd.pipe(pwd + /src/main/bin/RDDPipe.sh).collect(); #!/bin/sh # # Use this shell script to figure out how spark RDD pipe() works # set -x # turns shell debugging on #set +x # turns shell debugging off while read x ; do echo RDDPipe.sh $x ; Done Here is the output if debugging is turned on $ !grep grep RDDPipe run.sh.out + echo RDDPipe.sh 0 + echo RDDPipe.sh 0 + echo RDDPipe.sh 2 + echo RDDPipe.sh 0 + echo RDDPipe.sh 3 + echo RDDPipe.sh 0 + echo RDDPipe.sh 0 $
Re: spark-submit command-line with --files
Hey just a minor clarification, you _can_ use SparkFiles.get in your application only if it runs on the executors, e.g. in the following way: sc.parallelize(1 to 100).map { i = SparkFiles.get(my.file) }.collect() But not in general (otherwise NPE, as in your case). Perhaps this should be documented more clearly. Thanks to Marcelo for pointing this out.
Spark Streaming compilation error: algebird not a member of package com.twitter
Hi, I am using the latest release Spark 1.1.0. I am trying to build the streaming examples (under examples/streaming) as a standalone project with the following streaming.sbt file. When I run sbt assembly, I get an error stating that object algebird is not a member of package com.twitter. I tried adding the dependency spark-streaming-algebird, but that was not recognized. What dependency should I be including for algebird? import AssemblyKeys._ assemblySettings name := spark_stream_examples version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-sql % 1.1.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.1.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-twitter % 1.1.0 libraryDependencies += org.apache.spark %% spark-streaming-flume % 1.1.0 libraryDependencies += org.apache.spark %% spark-streaming-zeromq % 1.1.0 libraryDependencies += org.apache.spark %% spark-streaming-mqtt % 1.1.0 libraryDependencies += org.apache.spark %% spark-streaming-kafka % 1.1.0 //libraryDependencies += org.apache.spark %% spark-streaming-algebird % 1.1.0 resolvers += Akka Repository at http://repo.akka.io/releases/; -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-compilation-error-algebird-not-a-member-of-package-com-twitter-tp14709.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Example of Geoprocessing with Spark
Hi Evan, here a improved version, thanks for your advice. But you know the last step, the SaveAsTextFile is very Slw, :( import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import java.net.URL import java.text.SimpleDateFormat import com.vividsolutions.jts.geom._ import com.vividsolutions.jts.index.strtree.STRtree import com.vividsolutions.jts.io._ import org.geotools.data.FileDataStoreFinder import org.geotools.geometry.jts.{JTSFactoryFinder, ReferencedEnvelope} import org.opengis.feature.{simple, Feature, FeatureVisitor} import scala.collection.JavaConverters._ object SimpleApp { def main(args: Array[String]){ val conf = new SparkConf().setAppName(Csv Clipper) val sc = new SparkContext(conf) val csvPath = hdfs://m01/user/acoronado/mov/movilidad.csv val csv = sc.textFile(csvPath) //csv.coalesce(60,true) csv.cache() val clipPoints = csv.map({line: String = val Array(usuario, lat, lon, date) = line.split(,).map(_.trim) val geometryFactory = JTSFactoryFinder.getGeometryFactory(); val reader = new WKTReader(geometryFactory); val point = reader.read(POINT (+lon+ + lat + ) ) val envelope = point.getEnvelopeInternal val internal = geoDataMun.get(envelope) val (cve_est, cve_mun) = internal match { case l = { val existe = l.find( f = f match { case (g:Geometry,e:String,m:String) = g.intersects(point) case _ = false} ) existe match { case Some(t) = t match { case (g:Geometry,e:String,m:String) = (e,m) case _ = (0,0)} case None = (0, 0) } } case _ = (0, 0) } val time = try {(new SimpleDateFormat(-MM-dd'T'HH:mm:ss.SSSZ)).parse(date.replaceAll(Z$, +)).getTime().toString()} catch {case e: Exception = 0} line+,+time+,+cve_est+,+cve_mun }) clipPoints.saveAsTextFile(hdfs://m01/user/acoronado/mov/resultados_movilidad_fast.csv) } object geoDataMun { var spatialIndex = new STRtree() val path = new URL(file:geoData/MunicipiosLatLon.shp) val store = FileDataStoreFinder.getDataStore(path) val source = store.getFeatureSource(); val features = source.getFeatures(); val it = features.features(); while(it.hasNext){ val feature = it.next() val geom = feature.getDefaultGeometry if (geom != null) { val geomClass = geom match { case g2: Geometry = g2 case _ = throw new ClassCastException } val env = geomClass.getEnvelopeInternal(); if (!env.isNull) { spatialIndex.insert(env, (geomClass,feature.getAttribute(1),feature.getAttribute(2))); } } } def get(env:Envelope) = spatialIndex.query(env).asScala } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Example-of-Geoprocessing-with-Spark-tp14274p14710.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Bulk-load to HBase
I successfully did this once. RDD map to RDD [(ImmutableBytesWritable, KeyValue)] then val conf = HBaseConfiguration.create() val job = new Job (conf, CEF2HFile) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]); job.setMapOutputValueClass (classOf[KeyValue]); val table = new HTable(conf, output) HFileOutputFormat.configureIncrementalLoad (job, table); saveAsNewAPIHadoopFile(hdfs://localhost.localdomain:8020/user/cloudera/spark, classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat], conf) Then I do hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /user/cloudera/spark output to load the HFiles to hbase. - Original Message - From: Ted Yu yuzhih...@gmail.com To: Aniket Bhatnagar aniket.bhatna...@gmail.com Cc: innowireless TaeYun Kim taeyun@innowireless.co.kr, user user@spark.apache.org Sent: Friday, September 19, 2014 2:29:51 PM Subject: Re: Bulk-load to HBase Please see http://hbase.apache.org/book.html#completebulkload LoadIncrementalHFiles has a main() method. On Fri, Sep 19, 2014 at 5:41 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Agreed that the bulk import would be faster. In my case, I wasn't expecting a lot of data to be uploaded to HBase and also, I didn't want to take the pain of importing generated HFiles into HBase. Is there a way to invoke HBase HFile import batch script programmatically? On 19 September 2014 17:58, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: In fact, it seems that Put can be used by HFileOutputFormat, so Put object itself may not be the problem. The problem is that TableOutputFormat uses the Put object in the normal way (that goes through normal write path), while HFileOutFormat uses it to directly build the HFile. From: innowireless TaeYun Kim [mailto: taeyun@innowireless.co.kr ] Sent: Friday, September 19, 2014 9:20 PM To: user@spark.apache.org Subject: RE: Bulk-load to HBase Thank you for the example code. Currently I use foreachPartition() + Put(), but your example code can be used to clean up my code. BTW, since the data uploaded by Put() goes through normal HBase write path, it can be slow. So, it would be nice if bulk-load could be used, since it bypasses the write path. Thanks. From: Aniket Bhatnagar [ mailto:aniket.bhatna...@gmail.com ] Sent: Friday, September 19, 2014 9:01 PM To: innowireless TaeYun Kim Cc: user Subject: Re: Bulk-load to HBase I have been using saveAsNewAPIHadoopDataset but I use TableOutputFormat instead of HFileOutputFormat. But, hopefully this should help you: val hbaseZookeeperQuorum = s$zookeeperHost:$zookeeperPort:$zookeeperHbasePath val conf = HBaseConfiguration.create() conf.set(hbase.zookeeper.quorum, hbaseZookeeperQuorum) conf.set(TableOutputFormat.QUORUM_ADDRESS, hbaseZookeeperQuorum) conf.set(TableOutputFormat.QUORUM_PORT, zookeeperPort.toString) conf.setClass(mapreduce.outputformat.class, classOf[TableOutputFormat[Object]], classOf[OutputFormat[Object, Writable]]) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val rddToSave: RDD[(Array[Byte], Array[Byte], Array[Byte])] = ... // Some RDD that contains row key, column qualifier and data val putRDD = rddToSave.map(tuple = { val (rowKey, column data) = tuple val put: Put = new Put(rowKey) put.add(COLUMN_FAMILY_RAW_DATA_BYTES, column, data) (new ImmutableBytesWritable(rowKey), put) }) putRDD.saveAsNewAPIHadoopDataset(conf) On 19 September 2014 16:52, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Hi, Sorry, I just found saveAsNewAPIHadoopDataset. Then, Can I use HFileOutputFormat with saveAsNewAPIHadoopDataset? Is there any example code for that? Thanks. From: innowireless TaeYun Kim [mailto: taeyun@innowireless.co.kr ] Sent: Friday, September 19, 2014 8:18 PM To: user@spark.apache.org Subject: RE: Bulk-load to HBase Hi, After reading several documents, it seems that saveAsHadoopDataset cannot use HFileOutputFormat. It ’ s because saveAsHadoopDataset method uses JobConf, so it belongs to the old Hadoop API, while HFileOutputFormat is a member of mapreduce package which is for the new Hadoop API. Am I right? If so, is there another method to bulk-load to HBase from RDD? Thanks. From: innowireless TaeYun Kim [ mailto:taeyun@innowireless.co.kr ] Sent: Friday, September 19, 2014 7:17 PM To: user@spark.apache.org Subject: Bulk-load to HBase Hi, Is there a way to bulk-load to HBase from RDD? HBase offers HFileOutputFormat class for bulk loading by MapReduce job, but I cannot figure out how to use it with saveAsHadoopDataset. Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Failed running Spark ALS
Have you set spark.local.dir (I think this is the config setting)? It needs to point to a volume with plenty of space. By default if I recall it point to /tmp Sent from my iPhone On 19 Sep 2014, at 23:35, jw.cmu jinliangw...@gmail.com wrote: I'm trying to run Spark ALS using the netflix dataset but failed due to No space on device exception. It seems the exception is thrown after the training phase. It's not clear to me what is being written and where is the output directory. I was able to run the same code on the provided test.data dataset. I'm new to Spark and I'd like to get some hints for resolving this problem. The code I ran was got from https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html (the Java version). Relevant info: Spark version: 1.0.2 (Standalone deployment) # slaves/workers/exectuors: 8 Core per worker: 64 memory per executor: 100g Application parameters are left as default. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Failed-running-Spark-ALS-tp14704.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Better way to process large image data set ?
What Sean said. You should also definitely turn on Kryo serialization. The default Java serialization is really really slow if you're gonna move around lots of data.Also make sure you use a cluster with high network bandwidth on. On Thu, Sep 18, 2014 at 3:06 AM, Sean Owen so...@cloudera.com wrote: Base 64 is an inefficient encoding for binary data by about 2.6x. You could use byte[] directly. But you would still be storing and potentially shuffling lots of data in your RDDs. If the files exist separately on HDFS perhaps you can just send around the file location and load it directly using HDFS APIs in the function that needs it. On Sep 18, 2014 9:51 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to process a large image data set and need some way to optimize my implementation since it's very slow from now. In my current implementation I store my images in an object file with the following fields case class Image(groupId: String, imageId: String, buffer: String) Images belong to groups and have an id, the buffer is the image file (jpg, png) encode in base 64 string. Before running an image processing algorithm on the image buffer, I have a lot of jobs that filter, group, join images in my data set based on groupId or imageId and theses steps are relatively slow. I suspect that spark moves around my image buffer even if it's not necessary for these specific jobs and then there's a lot of communication times waste. Is there a better way to optimize my implementation ? Regards, Jaonary - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org