subscribe
Re: Spark as a application library vs infra
At Cloudera we recommend bundling your application separately from the Spark libraries. The two biggest reasons are: * No need to modify your application jar when upgrading or applying a patch. * When running on YARN, the Spark jar can be cached as a YARN local resource, meaning it doesn't need to be transferred every time. On Sun, Jul 27, 2014 at 8:52 PM, Tobias Pfeiffer t...@preferred.jp wrote: Mayur, I don't know if I exactly understand the context of what you are asking, but let me just mention issues I had with deploying. * As my application is a streaming application, it doesn't read any files from disk, so therefore I have no Hadoop/HDFS in place and I there is no need for it, either. There should be no dependency on Hadoop or HDFS, since you can perfectly run Spark applications without it. * I use Mesos and so far I always had the downloaded Spark distribution accessible for all machines (e.g., via HTTP) and then added my application code by uploading a jar built with `sbt assembly`. As the Spark code itself must not be contained in that jar file, I had to add '% provided' in the sbt file, which in turn prevented me from running the application locally from IntelliJ IDEA (it would not find the libraries marked with provided), I always had to use `sbt run`. * When using Mesos, on the Spark slaves the Spark jar is loaded before the application jar, and so the log4j file from the Spark jar is used instead of my custom one (that is different when running locally), so I had to edit that file in the Spark distribution jar to customize logging of my Spark nodes. I wonder if the two latter problems would vanish if the Spark libraries were bundled together with the application. (That would be your approach #1, I guess.) Tobias
Re: Hadoop Input Format - newAPIHadoopFile
Here is a tutorial on how to customize your own file format in hadoop: https://developer.yahoo.com/hadoop/tutorial/module5.html#fileformat and once you get your own file format, you can use it the same way as TextInputFormat in spark as you have done in this post. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hadoop-Input-Format-newAPIHadoopFile-tp2860p10762.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
VertexPartition and ShippableVertexPartition
There is a VertexPartition in the EdgePartition,which is created by EdgePartitionBuilder.toEdgePartition. and There is also a ShippableVertexPartition in the VertexRDD. These two Partitions have a lot of common things like index, data and Bitset, why is this necessary? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/VertexPartition-and-ShippableVertexPartition-tp10763.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
[Spark 1.0.1][SparkSQL] reduce stage of shuffle is slow。
I'm using SparkSQL with Hive 0.13, here is the SQL for inserting a partition with 2048 buckets. pre sqlsc.set(spark.sql.shuffle.partitions, 2048) hql(|insert %s table mz_log |PARTITION (date='%s') |select * from tmp_mzlog |CLUSTER BY mzid .stripMargin.format(overwrite, log_date)) /pre env: yarn-client mode with 80 executor, 2 cores/per executor. Data: original text log is about 1.1T. - - - the reduce stage is too slow. http://apache-spark-user-list.1001560.n3.nabble.com/file/n10765/Screen_Shot_2014-07-28_1.png here is the network usage, it's not the bottle neck. http://apache-spark-user-list.1001560.n3.nabble.com/file/n10765/Screen_Shot_2014-07-28_2.png and the CPU load is very high, why? http://apache-spark-user-list.1001560.n3.nabble.com/file/n10765/Screen_Shot_2014-07-28_3.png here is the configuration(conf/spark-defaults.conf) pre spark.ui.port spark.akka.frameSize128 spark.akka.timeout 600 spark.akka.threads 8 spark.files.overwrite true spark.executor.memory 2G spark.default.parallelism 32 spark.shuffle.consolidateFiles true spark.kryoserializer.buffer.mb 128 spark.storage.blockManagerSlaveTimeoutMs20 spark.serializerorg.apache.spark.serializer.KryoSerializer /pre 2 failed with MapTracker Error. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SparkSQL-reduce-stage-of-shuffle-is-slow-tp10765.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Confusing behavior of newAPIHadoopFile
the value in (key, value) returned by textFile is exactly one line of the input. But what I want is the field between the two “!!”, hope this makes sense. - Senior in Tsinghua Univ. github: http://www.github.com/uronce-cc -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764p10768.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
NotSerializableException exception while using TypeTag in Scala 2.10
I am trying to serialize objects contained in RDDs using runtime relfection via TypeTag. However, the Spark job keeps failing java.io.NotSerializableException on an instance of TypeCreator (auto generated by compiler to enable TypeTags). Is there any workaround for this without switching to scala 2.11?
Re: Confusing behavior of newAPIHadoopFile
Nop. My input file's format is: !! string1 string2 !! string3 string4 sc.textFile(path) will return RDD(!!, string1, string2, !!, string3, string4) what we need now is to transform this rdd to RDD(string1, string2, string3, string4) your solution may not handle this. - Senior in Tsinghua Univ. github: http://www.github.com/uronce-cc -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764p10777.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Confusing behavior of newAPIHadoopFile
Oh, you literally mean these are different lines, not the structure of a line. You can't solve this in general by reading the entire file into one string. If the input is tens of gigabytes you will probably exhaust memory on any of your machines. (Or, you might as well not bother with Spark then.) Do you really mean you want the strings that aren't !!? that's just a filter operation. But as I understand you need an RDD of complex data structures, containing many fields and key-value pairs across many lines. This is a difficult format to work with since Hadoop assumes a line is a record, which is very common, but your records span lines. If you have many small files, you could use wholeTextFiles to read entire small text files as a string value, and simply parse it with a Scala function as normal. That's fine as long as none of the files are huge. You can try mapPartitions for larger files, where you can parse an Iterator[String] instead of a String at a time and combine results from across lines into an Iterator[YourRecordType]. This would work as long as Hadoop does not break a file into several partitions, but not quite if a partition break occurs in your record. If you're willing to tolerate missing some records here and there, it is a fine scalable way to do it. On Mon, Jul 28, 2014 at 12:43 PM, chang cheng myai...@gmail.com wrote: Nop. My input file's format is: !! string1 string2 !! string3 string4 sc.textFile(path) will return RDD(!!, string1, string2, !!, string3, string4) what we need now is to transform this rdd to RDD(string1, string2, string3, string4) your solution may not handle this. - Senior in Tsinghua Univ. github: http://www.github.com/uronce-cc -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764p10777.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Confusing behavior of newAPIHadoopFile
Exactly, the fields between !! is a (key, value) customized data structure. So, newAPIHadoopFile may be the best practice now. For this specific format, change the delimiter from default \n to !!\n can be the cheapest, and this can only be done in hadoop2.x, in hadoop1.x, this can be done by Implementing a InputFormat although most codes are the same with TextInputFormat apart from the delimiter. This is my first time talking in this mail list and I find you guys are really nice! Thanks for your discussion with me! - Senior in Tsinghua Univ. github: http://www.github.com/uronce-cc -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764p10779.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Bad Digest error while doing aws s3 put
Hi I was using saveAsTextFile earlier. It was working fine. When we migrated to spark-1.0, I started getting the following error: java.lang.ClassNotFoundException: org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1 java.net.URLClassLoader$1.run(URLClassLoader.java:366) java.net.URLClassLoader$1.run(URLClassLoader.java:355) Hence I changed my code as follows: x.map(x = (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) After this I am facing this problem when I write very huge data to s3. This also occurs while writing to some partitions only, say while writing to 240 partitions, it might succeed for 156 files and then it will start throwing the Bad Digest Error and then it hangs. Please advise. Regards, lmk -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bad-Digest-error-while-doing-aws-s3-put-tp10036p10780.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
sbt directory missed
Hi, I have started a EC2 cluster using Spark by running spark-ec2 script. Just a little confused, I can not find sbt/ directory under /spark. I have checked spark-version, it's 1.0.0 (default). When I was working 0.9.x, sbt/ has been there. Is the script changed in 1.0.X ? I can not find any change log on this. Or maybe I am missing something. Certainly, I can download sbt and make things work. Just want to make things clear. Thank you. Here is the file list of spark/ root@ip-10-81-154-223:~# ls -l spark total 384 drwxrwxr-x 10 1000 1000 4096 Jul 28 14:58 . drwxr-xr-x 20 root root 4096 Jul 28 14:58 .. drwxrwxr-x 2 1000 1000 4096 Jul 28 13:34 bin -rw-rw-r-- 1 1000 1000 281471 May 26 07:02 CHANGES.txt drwxrwxr-x 2 1000 1000 4096 Jul 28 08:22 conf drwxrwxr-x 4 1000 1000 4096 May 26 07:02 ec2 drwxrwxr-x 3 1000 1000 4096 May 26 07:02 examples drwxrwxr-x 2 1000 1000 4096 May 26 07:02 lib -rw-rw-r-- 1 1000 1000 29983 May 26 07:02 LICENSE drwxr-xr-x 2 root root 4096 Jul 28 14:42 logs -rw-rw-r-- 1 1000 1000 22559 May 26 07:02 NOTICE drwxrwxr-x 6 1000 1000 4096 May 26 07:02 python -rw-rw-r-- 1 1000 1000 4221 May 26 07:02 README.md -rw-rw-r-- 1 1000 1000 35 May 26 07:02 RELEASE drwxrwxr-x 2 1000 1000 4096 May 26 07:02 sbin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-directory-missed-tp10783.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: sbt directory missed
update: Just checked the python launch script, when retrieving spark, it will refer to this script: https://github.com/mesos/spark-ec2/blob/v3/spark/init.sh where each version number is mapped to a tar file, 0.9.2) if [[ $HADOOP_MAJOR_VERSION == 1 ]]; then wget http://s3.amazonaws.com/spark-related-packages/spark-0.9.2-bin-hadoop1.tgz else wget http://s3.amazonaws.com/spark-related-packages/spark-0.9.2-bin-cdh4.tgz fi ;; 1.0.0) if [[ $HADOOP_MAJOR_VERSION == 1 ]]; then wget http://s3.amazonaws.com/spark-related-packages/spark-1.0.0-bin-hadoop1.tgz else wget http://s3.amazonaws.com/spark-related-packages/spark-1.0.0-bin-cdh4.tgz fi ;; 1.0.1) if [[ $HADOOP_MAJOR_VERSION == 1 ]]; then wget http://s3.amazonaws.com/spark-related-packages/spark-1.0.1-bin-hadoop1.tgz else wget http://s3.amazonaws.com/spark-related-packages/spark-1.0.1-bin-cdh4.tgz fi ;; I just checked the three last tar file. I find the /sbt directory and many other directory like bagel, mllib, etc in 0.9.2 tar file. However, they are not in 1.0.0 and 1.0.1 tar files. I am not sure that 1.0.X versions are mapped to the correct tar files. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-directory-missed-tp10783p10784.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Fraud management system implementation
+user list bcc: dev list It's definitely possible to implement credit fraud management using Spark. A good start would be using some of the supervised learning algorithms that Spark provides in MLLib (logistic regression or linear SVMs). Spark doesn't have any HMM implementation right now. Sean Owen has a great talk on performing anomaly detection with KMeans clustering in Spark - https://www.youtube.com/watch?v=TC5cKYBZAeI -Sandy On Mon, Jul 28, 2014 at 7:15 AM, jitendra shelar jitendra.shelar...@gmail.com wrote: Hi, I am new to spark. I am learning spark and scala. I had some queries. 1) Can somebody please tell me if it is possible to implement credit card fraud management system using spark? 2) If yes, can somebody please guide me how to proceed. 3) Shall I prefer Scala or Java for this implementation? 4) Please suggest me some pointers related to Hidden Markonav Model (HMM) and anomaly detection in data mining (using spark). Thanks, Jitendra
Re: Debugging Task not serializable
A quick fix would be to implement java.io.Serializable in those classes which are causing this exception. Thanks Best Regards On Mon, Jul 28, 2014 at 9:21 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi all, I was wondering if someone has conceived a method for debugging Task not serializable: java.io.NotSerializableException errors, apart from commenting and uncommenting parts of the program, or just turning everything into Serializable. I find this kind of error very hard to debug, as these are originated in the Spark runtime system. I'm using Spark for Java. Thanks a lot in advance, Juan
Re: Debugging Task not serializable
Also check the guides for the JVM option that prints messages for such problems. Sorry, sent from phone and don't know it by heart :/ Le 28 juil. 2014 18:44, Akhil Das ak...@sigmoidanalytics.com a écrit : A quick fix would be to implement java.io.Serializable in those classes which are causing this exception. Thanks Best Regards On Mon, Jul 28, 2014 at 9:21 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi all, I was wondering if someone has conceived a method for debugging Task not serializable: java.io.NotSerializableException errors, apart from commenting and uncommenting parts of the program, or just turning everything into Serializable. I find this kind of error very hard to debug, as these are originated in the Spark runtime system. I'm using Spark for Java. Thanks a lot in advance, Juan
Re: MLlib NNLS implementation is buggy, returning wrong solutions
Hi Aureliano, Will it be possible for you to give the test-case ? You can add it to JIRA as well as an attachment I guess... I am preparing the PR for ADMM based QuadraticMinimizer...In my matlab experiments with scaling the rank to 1000 and beyond (which is too high for ALS but gives a good idea of solver scalability, ~400 is the max I have seen in the sparkler paper), I am noticing consistent results both in correctness and runtime with MOSEK... I will update more on the JIRA this week...got it cleared from our legal last week...Please stay tuned... https://issues.apache.org/jira/browse/SPARK-2426 Thanks. Deb On Sun, Jul 27, 2014 at 12:38 PM, DB Tsai dbt...@dbtsai.com wrote: Could you help to provide a test case to verify this issue and open a JIRA to track this? Also, are you interested in submit a PR to fix it? Thanks. Sent from my Google Nexus 5 On Jul 27, 2014 11:07 AM, Aureliano Buendia buendia...@gmail.com wrote: Hi, The recently added NNLS implementation in MLlib returns wrong solutions. This is not data specific, just try any data in R's nnls, and then the same data in MLlib's NNLS. The results are very different. Also, the elected algorithm Polyak(1969) is not the best one around. The most popular one is Lawson-Hanson (1974): http://en.wikipedia.org/wiki/Non-negative_least_squares#Algorithms
Re: MLlib NNLS implementation is buggy, returning wrong solutions
It is possible that the answer (the final solution vector x) given by two different algorithms (such as the one in mllib and in R) are different, as the problem may not be strictly convex and multiple global optimum may exist. However, these answers should admit the same objective values. Can you give an example such that the objective value of other method is better (smaller) than the obj of mllib? 2014-07-27 11:06 GMT-07:00 Aureliano Buendia buendia...@gmail.com: Hi, The recently added NNLS implementation in MLlib returns wrong solutions. This is not data specific, just try any data in R's nnls, and then the same data in MLlib's NNLS. The results are very different. Also, the elected algorithm Polyak(1969) is not the best one around. The most popular one is Lawson-Hanson (1974): http://en.wikipedia.org/wiki/Non-negative_least_squares#Algorithms
akka.tcp://spark@localhost:7077/user/MapOutputTracker akka.actor.ActorNotFound
Hello community Using following distros: spark: http://archive.cloudera.com/cdh5/cdh/5/spark-1.0.0-cdh5.1.0-src.tar.gz mesos: http://archive.apache.org/dist/mesos/0.19.0/mesos-0.19.0.tar.gz both assembled with with scala 2.10.4 and java 7 my #!/usr/bin/env bash my spark-env.sh looks as follows: export SCALA_HOME=/opt/local/src/scala/scala-2.10.4 export MESOS_NATIVE_LIBRARY=/opt/local/src/mesos/mesos-0.19.0/dist/lib/libmesos.so export SPARK_EXECUTOR_URI=hdfs://localhost:8020/spark/spark-1.0.0-cdh5.1.0-bin-2.3.0-cdh5.0.3.tgz export HADOOP_CONF_DIR=/opt/local/cloudera/hadoop/cdh5/hadoop-2.3.0-cdh5.0.3/etc/hadoop export STANDALONE_SPARK_MASTER_HOST=192.168.122.1 export MASTER=mesos://192.168.122.1 export SPARK_MASTER_IP=192.168.122.1 export SPARK_LOCAL_IP=192.168.122.1 When I run a sample spark job I get (below) thanks in advance for explanation/fix to the exception Note if I run spark job on spark by itself (or hadoop yarn) job runs without any problem WARNING: Logging before InitGoogleLogging() is written to STDERR I0728 14:33:52.421203 19678 fetcher.cpp:73] Fetching URI 'hdfs://localhost:8020/spark/spark-1.0.0-cdh5.1.0-bin-2.3.0-cdh5.0.3.tgz' I0728 14:33:52.421346 19678 fetcher.cpp:102] Downloading resource from 'hdfs://localhost:8020/spark/spark-1.0.0-cdh5.1.0-bin-2.3.0-cdh5.0.3.tgz' to '/tmp/mesos/slaves/20140724-134606-16777343-5050-25095-0/frameworks/20140728-143300-24815808-5050-19059-/executors/20140724-134606-16777343-5050-25095-0/runs/c9c9eaa2-b722-4215-a35a-dc1c353963b9/spark-1.0.0-cdh5.1.0-bin-2.3.0-cdh5.0.3.tgz' I0728 14:33:58.201438 19678 fetcher.cpp:61] Extracted resource '/tmp/mesos/slaves/20140724-134606-16777343-5050-25095-0/frameworks/20140728-143300-24815808-5050-19059-/executors/20140724-134606-16777343-5050-25095-0/runs/c9c9eaa2-b722-4215-a35a-dc1c353963b9/spark-1.0.0-cdh5.1.0-bin-2.3.0-cdh5.0.3.tgz' into '/tmp/mesos/slaves/20140724-134606-16777343-5050-25095-0/frameworks/20140728-143300-24815808-5050-19059-/executors/20140724-134606-16777343-5050-25095-0/runs/c9c9eaa2-b722-4215-a35a-dc1c353963b9' Spark assembly has been built with Hive, including Datanucleus jars on classpath log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/07/28 14:33:59 INFO SparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties WARNING: Logging before InitGoogleLogging() is written to STDERR I0728 14:33:59.896520 19785 exec.cpp:131] Version: 0.19.0 I0728 14:33:59.899474 19805 exec.cpp:205] Executor registered on slave 20140724-134606-16777343-5050-25095-0 14/07/28 14:33:59 INFO MesosExecutorBackend: Registered with Mesos as executor ID 20140724-134606-16777343-5050-25095-0 14/07/28 14:34:00 INFO SecurityManager: Changing view acls to: amilkowski 14/07/28 14:34:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(amilkowski) 14/07/28 14:34:00 INFO Slf4jLogger: Slf4jLogger started 14/07/28 14:34:00 INFO Remoting: Starting remoting 14/07/28 14:34:01 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@localhost:40412] 14/07/28 14:34:01 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@localhost:40412] 14/07/28 14:34:01 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp://spark@localhost:7077/user/MapOutputTracker akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://spark@localhost :7077/]/user/MapOutputTracker] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:66) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:64) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512) at akka.actor.DeadLetterActorRef.specialHandle
Re: VertexPartition and ShippableVertexPartition
On Mon, Jul 28, 2014 at 4:29 AM, Larry Xiao xia...@sjtu.edu.cn wrote: On 7/28/14, 3:41 PM, shijiaxin wrote: There is a VertexPartition in the EdgePartition,which is created by EdgePartitionBuilder.toEdgePartition. and There is also a ShippableVertexPartition in the VertexRDD. These two Partitions have a lot of common things like index, data and Bitset, why is this necessary? There is a VertexPartition in the EdgePartition,which is created by Is the VertexPartition in the EdgePartition, the Mirror Cache part? Yes, exactly. The primary copy of each vertex is stored in the VertexRDD using the index, values, and mask data structures, which together form a hash map. In addition, each partition of the VertexRDD stores the corresponding partition of the routing table to facilitate joining with the edges. The ShippableVertexPartition class encapsulates the vertex hash map along with a RoutingTablePartition. After joining the vertices with the edges, the edge partitions cache their adjacent vertices in the mirror cache. They use the VertexPartition for this, which provides only the hash map functionality and not the routing table. Ankur http://www.ankurdave.com/
how to publish spark inhouse?
hey we used to publish spark inhouse by simply overriding the publishTo setting. but now that we are integrated in SBT with maven i cannot find it anymore. i tried looking into the pom file, but after reading 1144 lines of xml i 1) havent found anything that looks like publishing 2) i feel somewhat sick too 3) i am considering alternative careers to developing... where am i supposed to look? thanks for your help!
javasparksql Hbase
Hi Team, Could you please let me know example program/link for JavaSparkSql to join 2 Hbase tables. Regards, Rajesh
Re: how to publish spark inhouse?
and if i want to change the version, it seems i have to change it in all 23 pom files? mhhh. is it mandatory for these sub-project pom files to repeat that version info? useful? spark$ grep 1.1.0-SNAPSHOT * -r | wc -l 23 On Mon, Jul 28, 2014 at 3:05 PM, Koert Kuipers ko...@tresata.com wrote: hey we used to publish spark inhouse by simply overriding the publishTo setting. but now that we are integrated in SBT with maven i cannot find it anymore. i tried looking into the pom file, but after reading 1144 lines of xml i 1) havent found anything that looks like publishing 2) i feel somewhat sick too 3) i am considering alternative careers to developing... where am i supposed to look? thanks for your help!
Re: how to publish spark inhouse?
This is not something you edit yourself. The Maven release plugin manages setting all this. I think virtually everything you're worried about is done for you by this plugin. Maven requires artifacts to set a version and it can't inherit one. I feel like I understood the reason this is necessary at one point. On Mon, Jul 28, 2014 at 8:33 PM, Koert Kuipers ko...@tresata.com wrote: and if i want to change the version, it seems i have to change it in all 23 pom files? mhhh. is it mandatory for these sub-project pom files to repeat that version info? useful? spark$ grep 1.1.0-SNAPSHOT * -r | wc -l 23 On Mon, Jul 28, 2014 at 3:05 PM, Koert Kuipers ko...@tresata.com wrote: hey we used to publish spark inhouse by simply overriding the publishTo setting. but now that we are integrated in SBT with maven i cannot find it anymore. i tried looking into the pom file, but after reading 1144 lines of xml i 1) havent found anything that looks like publishing 2) i feel somewhat sick too 3) i am considering alternative careers to developing... where am i supposed to look? thanks for your help!
Spark java.lang.AbstractMethodError
I am trying to run an example Spark standalone app with the following code import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ object SparkGensimLDA extends App{ val ssc=new StreamingContext(local,testApp,Seconds(5)) val lines=ssc.textFileStream(/.../spark_example/) val words=lines.flatMap(_.split( )) val wordCounts=words.map(x = (x,1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } However I am getting the following error 15:35:40.170 [spark-akka.actor.default-dispatcher-2] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-3] shutting down ActorSystem [spark] java.lang.AbstractMethodError: null at akka.actor.ActorCell.create(ActorCell.scala:580) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.10-2.3.2.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] 15:35:40.171 [spark-akka.actor.default-dispatcher-2] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-4] shutting down ActorSystem [spark] java.lang.AbstractMethodError: null at akka.actor.ActorCell.create(ActorCell.scala:580) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.10-2.3.2.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] 15:35:40.175 [main] DEBUG o.a.spark.storage.DiskBlockManager - Creating local directories at root dirs '/var/folders/6y/h1f088_j007_d11kpwb1jg6mgp/T/' 15:35:40.176 [spark-akka.actor.default-dispatcher-4] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-2] shutting down ActorSystem [spark] java.lang.AbstractMethodError: org.apache.spark.storage.BlockManagerMasterActor.aroundPostStop()V at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.terminate(ActorCell.scala:369) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.10-2.3.2.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] 15:35:40.177 [spark-akka.actor.default-dispatcher-4] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-4] shutting down ActorSystem [spark] java.lang.AbstractMethodError:
Re: how to publish spark inhouse?
ah ok thanks. guess i am gonna read up about maven-release-plugin then! On Mon, Jul 28, 2014 at 3:37 PM, Sean Owen so...@cloudera.com wrote: This is not something you edit yourself. The Maven release plugin manages setting all this. I think virtually everything you're worried about is done for you by this plugin. Maven requires artifacts to set a version and it can't inherit one. I feel like I understood the reason this is necessary at one point. On Mon, Jul 28, 2014 at 8:33 PM, Koert Kuipers ko...@tresata.com wrote: and if i want to change the version, it seems i have to change it in all 23 pom files? mhhh. is it mandatory for these sub-project pom files to repeat that version info? useful? spark$ grep 1.1.0-SNAPSHOT * -r | wc -l 23 On Mon, Jul 28, 2014 at 3:05 PM, Koert Kuipers ko...@tresata.com wrote: hey we used to publish spark inhouse by simply overriding the publishTo setting. but now that we are integrated in SBT with maven i cannot find it anymore. i tried looking into the pom file, but after reading 1144 lines of xml i 1) havent found anything that looks like publishing 2) i feel somewhat sick too 3) i am considering alternative careers to developing... where am i supposed to look? thanks for your help!
Issues on spark-shell and spark-submit behave differently on spark-defaults.conf parameter spark.eventLog.dir
Hi All, Not sure if anyone has ran into this problem, but this exist in spark 1.0.0 when you specify the location in conf/spark-defaults.conf for spark.eventLog.dir hdfs:///user/$USER/spark/logs to use the $USER env variable. For example, I'm running the command with user 'test'. In spark-submit, the folder will be created on-the-fly and you will see the event logs created on HDFS /user/test/spark/logs/spark-pi-1405097484152 but in spark-shell, the user 'test' folder is not created, and you will see this /user/$USER/spark/logs on HDFS. It will try to create /user/$USER/spark/logs instead of /user/test/spark/logs. It looks like spark-shell couldn't pick up the env variable $USER to apply for the eventLog directory for the running user 'test'. Is this considered a bug or bad practice to use spark-shell with Spark's HistoryServer?
Re: sbt directory missed
I think the 1.0 AMI only contains the prebuilt packages (i.e just the binaries) of Spark and not the source code. If you want to build Spark on EC2, you'll can clone the github repo and then use sbt. Thanks Shivaram On Mon, Jul 28, 2014 at 8:49 AM, redocpot julien19890...@gmail.com wrote: update: Just checked the python launch script, when retrieving spark, it will refer to this script: https://github.com/mesos/spark-ec2/blob/v3/spark/init.sh where each version number is mapped to a tar file, 0.9.2) if [[ $HADOOP_MAJOR_VERSION == 1 ]]; then wget http://s3.amazonaws.com/spark-related-packages/spark-0.9.2-bin-hadoop1.tgz else wget http://s3.amazonaws.com/spark-related-packages/spark-0.9.2-bin-cdh4.tgz fi ;; 1.0.0) if [[ $HADOOP_MAJOR_VERSION == 1 ]]; then wget http://s3.amazonaws.com/spark-related-packages/spark-1.0.0-bin-hadoop1.tgz else wget http://s3.amazonaws.com/spark-related-packages/spark-1.0.0-bin-cdh4.tgz fi ;; 1.0.1) if [[ $HADOOP_MAJOR_VERSION == 1 ]]; then wget http://s3.amazonaws.com/spark-related-packages/spark-1.0.1-bin-hadoop1.tgz else wget http://s3.amazonaws.com/spark-related-packages/spark-1.0.1-bin-cdh4.tgz fi ;; I just checked the three last tar file. I find the /sbt directory and many other directory like bagel, mllib, etc in 0.9.2 tar file. However, they are not in 1.0.0 and 1.0.1 tar files. I am not sure that 1.0.X versions are mapped to the correct tar files. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-directory-missed-tp10783p10784.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark streaming vs. spark usage
So after months and months, I finally started to try and tackle this, but my scala ability isn't up to it. The problem is that, of course, even with the common interface, we don't want inter-operability between RDDs and DStreams. I looked into Monads, as per Ashish's suggestion, and I think I understand their relevance. But when done processing, one would still have to pull out the wrapped object, knowing what it was, and I don't see how to do that. I'm guessing there is a way to do this in scala, but I'm not seeing it. In detail, the requirement would be having something on the order of: abstract class DistributedCollection[T] { def [U] map(fcn: T = U): DistributedCollection[U] ... } class RDD extends DistrubutedCollection[T] { // Note the return type that doesn't quite match the interface def [U] map(fcn: T = U): RDD[U] ... } class DStream extends DistrubutedCollection[T] { // Note the return type that doesn't quite match the interface def [U] map(fcn: T = U): DStreamU] ... } Can anyone point me at a way to do this? Thanks, -Nathan On Thu, Dec 19, 2013 at 1:08 AM, Ashish Rangole arang...@gmail.com wrote: I wonder if it will help to have a generic Monad container that wraps either RDD or DStream and provides map, flatmap, foreach and filter methods. case class DataMonad[A](data: A) { def map[B]( f : A = B ) : DataMonad[B] = { DataMonad( f( data ) ) } def flatMap[B]( f : A = DataMonad[B] ) : DataMonad[B] = { f( data ) } def foreach ... def withFilter ... : : etc, something like that } On Wed, Dec 18, 2013 at 10:42 PM, Reynold Xin r...@apache.org wrote: On Wed, Dec 18, 2013 at 12:17 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: Since many of the functions exist in parallel between the two, I guess I would expect something like: trait BasicRDDFunctions { def map... def reduce... def filter... def foreach... } class RDD extends BasicRDDFunctions... class DStream extends BasicRDDFunctions... I like this idea. We should discuss more about it on the dev list. It would require refactoring some APIs, but does lead to better unification. -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: KMeans: expensiveness of large vectors
Hi Xiangru, thanks for the explanation. 1. You said we have to broadcast m * k centers (with m = number of rows). I thought there were only k centers at each time, which would the have size of n * k and needed to be broadcasted. Is that I typo or did I understand something wrong? And the collection of the average is partition-wise. So more partitions = more overhead, but basically same number of operations? 2. I have 5 executors with 8 CPU cores and 25G of memory each, and I usually split the input RDD into 80 partitions for a few Gigs of input data. Is there a rule of thumb for the number of partitions in relation to the input size? 3. Assuming I wouldn't use numeric data but instead converted text data into a numeric representation using a dictionary and a featurization function: The number of columns would be the number of entries in my dictionary (i.e. number of distinct words in my case). I'd use a sparse vector representation of course. But even so, if I have a few hundred thousand entries and therefore columns, broadcasting overhead will get very large, as the centers are still in a dense representation. Do you know of any way to improve performance then? Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614p10804.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
zip two RDD in pyspark
I have a file in s3 that I want to map each line with an index. Here is my code: input_data = sc.textFile('s3n:/myinput',minPartitions=6).cache() N input_data.count() index = sc.parallelize(range(N), 6) index.zip(input_data).collect() ... 14/07/28 19:49:31 INFO DAGScheduler: Completed ResultTask(18, 4) 14/07/28 19:49:31 INFO DAGScheduler: Stage 18 (collect at stdin:1) finished in 0.031 s 14/07/28 19:49:31 INFO SparkContext: Job finished: collect at stdin:1, took 0.03707 s Traceback (most recent call last): File stdin, line 1, in module File /root/spark/python/pyspark/rdd.py, line 584, in collect return list(self._collect_iterator_through_file(bytesInJava)) File /root/spark/python/pyspark/rdd.py, line 592, in _collect_iterator_through_file self.ctx._writeToFile(iterator, tempFile.name) File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.writeToFile. : java.lang.ClassCastException: java.lang.String cannot be cast to [B at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:312) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:309) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:309) at org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:342) at org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:337) at org.apache.spark.api.python.PythonRDD.writeToFile(PythonRDD.scala) at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:744) As I see it, the job is completed, but I don't understand what's happening to 'String cannot be cast to [B'. I tried to zip two parallelCollectionRDD and it works fine. But here I have a MappedRDD at textFile. Not sure what's going on here. Also, why Python does not have ZipWithIndex()? Thanks for any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/zip-two-RDD-in-pyspark-tp10806.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Issues on spark-shell and spark-submit behave differently on spark-defaults.conf parameter spark.eventLog.dir
Hi Andrew, It's definitely not bad practice to use spark-shell with HistoryServer. The issue here is not with spark-shell, but the way we pass Spark configs to the application. spark-defaults.conf does not currently support embedding environment variables, but instead interprets everything as a string literal. You will have to manually specify test instead of $USER in the path you provide to spark.eventLog.dir. -Andrew 2014-07-28 12:40 GMT-07:00 Andrew Lee alee...@hotmail.com: Hi All, Not sure if anyone has ran into this problem, but this exist in spark 1.0.0 when you specify the location in *conf/spark-defaults.conf* for spark.eventLog.dir hdfs:///user/$USER/spark/logs to use the *$USER* env variable. For example, I'm running the command with user 'test'. In *spark-submit*, the folder will be created on-the-fly and you will see the event logs created on HDFS */user/test/spark/logs/spark-pi-1405097484152* but in *spark-shell*, the user 'test' folder is not created, and you will see this */user/$USER/spark/logs* on HDFS. It will try to create */user/$USER/spark/logs* instead of */user/test/spark/logs*. It looks like spark-shell couldn't pick up the env variable $USER to apply for the eventLog directory for the running user 'test'. Is this considered a bug or bad practice to use spark-shell with Spark's HistoryServer?
Re: KMeans: expensiveness of large vectors
1. I meant in the n (1k) by m (10k) case, we need to broadcast k centers and hence the total size is m * k. In 1.0, the driver needs to send the current centers to each partition one by one. In the current master, we use torrent to broadcast the centers to workers, which should be much faster. 2. For MLlib algorithms, the number of partitions shouldn't be much larger than the number of CPU cores. Your setting looks good. 3. You can use the hashing trick to limit the number of features, or remove low-frequency and high-frequency words from the dictionary. Best, Xiangrui On Mon, Jul 28, 2014 at 12:55 PM, durin m...@simon-schaefer.net wrote: Hi Xiangru, thanks for the explanation. 1. You said we have to broadcast m * k centers (with m = number of rows). I thought there were only k centers at each time, which would the have size of n * k and needed to be broadcasted. Is that I typo or did I understand something wrong? And the collection of the average is partition-wise. So more partitions = more overhead, but basically same number of operations? 2. I have 5 executors with 8 CPU cores and 25G of memory each, and I usually split the input RDD into 80 partitions for a few Gigs of input data. Is there a rule of thumb for the number of partitions in relation to the input size? 3. Assuming I wouldn't use numeric data but instead converted text data into a numeric representation using a dictionary and a featurization function: The number of columns would be the number of entries in my dictionary (i.e. number of distinct words in my case). I'd use a sparse vector representation of course. But even so, if I have a few hundred thousand entries and therefore columns, broadcasting overhead will get very large, as the centers are still in a dense representation. Do you know of any way to improve performance then? Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614p10804.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Issues on spark-shell and spark-submit behave differently on spark-defaults.conf parameter spark.eventLog.dir
Hi Andrew, Thanks to re-confirm the problem. I thought it only happens to my own build. :) by the way, we have multiple users using the spark-shell to explore their dataset, and we are continuously looking into ways to isolate their jobs history. In the current situation, we can't really ask them to create their own spark-defaults.conf since this is set to read-only. A workaround is to set it to a shared folder e.g. /user/spark/logs and user permission 1777. This isn't really ideal since other people can see what are the other jobs running on the shared cluster. It will be nice to have a better security if this is enhanced so people aren't exposing their algorithm (which is usually embed in their job's name) to other users. Will there or is there a JIRA ticket to keep track of this? any plan to enhance this part for spark-shell ? Date: Mon, 28 Jul 2014 13:54:56 -0700 Subject: Re: Issues on spark-shell and spark-submit behave differently on spark-defaults.conf parameter spark.eventLog.dir From: and...@databricks.com To: user@spark.apache.org Hi Andrew, It's definitely not bad practice to use spark-shell with HistoryServer. The issue here is not with spark-shell, but the way we pass Spark configs to the application. spark-defaults.conf does not currently support embedding environment variables, but instead interprets everything as a string literal. You will have to manually specify test instead of $USER in the path you provide to spark.eventLog.dir. -Andrew 2014-07-28 12:40 GMT-07:00 Andrew Lee alee...@hotmail.com: Hi All, Not sure if anyone has ran into this problem, but this exist in spark 1.0.0 when you specify the location in conf/spark-defaults.conf for spark.eventLog.dir hdfs:///user/$USER/spark/logs to use the $USER env variable. For example, I'm running the command with user 'test'. In spark-submit, the folder will be created on-the-fly and you will see the event logs created on HDFS /user/test/spark/logs/spark-pi-1405097484152 but in spark-shell, the user 'test' folder is not created, and you will see this /user/$USER/spark/logs on HDFS. It will try to create /user/$USER/spark/logs instead of /user/test/spark/logs. It looks like spark-shell couldn't pick up the env variable $USER to apply for the eventLog directory for the running user 'test'. Is this considered a bug or bad practice to use spark-shell with Spark's HistoryServer?
RE: Need help, got java.lang.ExceptionInInitializerError in Yarn-Client/Cluster mode
Hi Jianshi, My understanding is 'No' based on how Spark's is designed even with your own log4j.properties in the Spark's conf folder. In YARN mode, the Application Master is running inside the cluster and all logs are part of containers log which is defined by another log4j.properties file from the Hadoop and YARN environment. Spark can't override that unless it can provide its own log4j prior to YARN's in the classpath. So the only way is to login to the resource manager and click on the job itself to read the containers log. (Other people) Please correct me if my understanding is wrong. You may be thinking why can't I stream the log's to an external service (e.g. Flume, syslogd) with a different appender in log4j, myself don't consider this a good practice since:1. you need 2 infra structure to operate the entire cluster. 2. you will need to open up the firewall ports between the 2 services to transfer/stream logs.3. unpredictable traffic, the YARN cluster may bring down the logging service/infra (DDoS) when someone accidentally change the logging level from WARN to INFO, or worst, DEBUG. I was thinking maybe we can suggest the community to enhance the Spark HistoryServer to capture the last failure exception from the container logs in the last failed stage? Not sure if this is an good idea since it may complicate the event model. I'm not sure if Akka model can support this or some other components in Spark could help to capture these exceptions and pass it back to AM and eventually stored in somewhere for later troubleshooting. I'm not clear how this path is constructed until reading the source code, so I can't give a better answer. AL From: jianshi.hu...@gmail.com Date: Mon, 28 Jul 2014 13:32:05 +0800 Subject: Re: Need help, got java.lang.ExceptionInInitializerError in Yarn-Client/Cluster mode To: user@spark.apache.org Hi Andrew, Thanks for the reply, I figured out the cause of the issue. Some resource files were missing in JARs. A class initialization depends on the resource files so it got that exception. I appended the resource files explicitly to --jars option and it worked fine. The Caused by... messages were found in yarn logs actually, I think it might be useful if I can seem them from the console which runs spark-submit. Would that be possible? Jianshi On Sat, Jul 26, 2014 at 7:08 AM, Andrew Lee alee...@hotmail.com wrote: Hi Jianshi, Could you provide which HBase version you're using? By the way, a quick sanity check on whether the Workers can access HBase? Were you able to manually write one record to HBase with the serialize function? Hardcode and test it ? From: jianshi.hu...@gmail.com Date: Fri, 25 Jul 2014 15:12:18 +0800 Subject: Re: Need help, got java.lang.ExceptionInInitializerError in Yarn-Client/Cluster mode To: user@spark.apache.org I nailed it down to a union operation, here's my code snippet: val properties: RDD[((String, String, String), Externalizer[KeyValue])] = vertices.map { ve = val (vertices, dsName) = ve val rval = GraphConfig.getRval(datasetConf, Constants.VERTICES, dsName) val (_, rvalAsc, rvalType) = rval println(sTable name: $dsName, Rval: $rval) println(vertices.toDebugString) vertices.map { v =val rk = appendHash(boxId(v.id)).getBytes val cf = PROP_BYTES val cq = boxRval(v.rval, rvalAsc, rvalType).getBytesval value = Serializer.serialize(v.properties) ((new String(rk), new String(cf), new String(cq)), Externalizer(put(rk, cf, cq, value))) } }.reduce(_.union(_)).sortByKey(numPartitions = 32) Basically I read data from multiple tables (Seq[RDD[(key, value)]]) and they're transformed to the a KeyValue to be insert in HBase, so I need to do a .reduce(_.union(_)) to combine them into one RDD[(key, value)]. I cannot see what's wrong in my code. Jianshi On Fri, Jul 25, 2014 at 12:24 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I can successfully run my code in local mode using spark-submit (--master local[4]), but I got ExceptionInInitializerError errors in Yarn-client mode. Any hints what is the problem? Is it a closure serialization problem? How can I debug it? Your answers would be very helpful. 14/07/25 11:48:14 WARN scheduler.TaskSetManager: Loss was due to java.lang.ExceptionInInitializerErrorjava.lang.ExceptionInInitializerError at com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scal a:40)at com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scala:36) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1016) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847) at
Re: sbt directory missed
Thank you for your reply. I need sbt for packaging my project and then submit it. Could you tell me how to run a spark project on 1.0 AMI without sbt? I don't understand why 1.0 only contains the prebuilt packages. I dont think it makes sense, since sbt is essential. User has to download sbt or clone github repo, whereas in 0.9 ami, sbt is pre-installed. A command like: $ sbt/sbt package run could do the job. Thanks. =) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-directory-missed-tp10783p10812.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Getting the number of slaves
Do getExecutorStorageStatus and getExecutorMemoryStatus both return the number of executors + the driver? E.g., if I submit a job with 10 executors, I get 11 for getExeuctorStorageStatus.length and getExecutorMemoryStatus.size On Thu, Jul 24, 2014 at 4:53 PM, Nicolas Mai nicolas@gmail.com wrote: Thanks, this is what I needed :) I should have searched more... Something I noticed though: after the SparkContext is initialized, I had to wait for a few seconds until sc.getExecutorStorageStatus.length returns the correct number of workers in my cluster (otherwise it returns 1, for the driver)... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604p10619.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Getting the number of slaves
Yes, both of these are derived from the same source, and this source includes the driver. In other words, if you submit a job with 10 executors you will get back 11 for both statuses. 2014-07-28 15:40 GMT-07:00 Sung Hwan Chung coded...@cs.stanford.edu: Do getExecutorStorageStatus and getExecutorMemoryStatus both return the number of executors + the driver? E.g., if I submit a job with 10 executors, I get 11 for getExeuctorStorageStatus.length and getExecutorMemoryStatus.size On Thu, Jul 24, 2014 at 4:53 PM, Nicolas Mai nicolas@gmail.com wrote: Thanks, this is what I needed :) I should have searched more... Something I noticed though: after the SparkContext is initialized, I had to wait for a few seconds until sc.getExecutorStorageStatus.length returns the correct number of workers in my cluster (otherwise it returns 1, for the driver)... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604p10619.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark streaming vs. spark usage
On Mon, Jul 28, 2014 at 12:53 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: But when done processing, one would still have to pull out the wrapped object, knowing what it was, and I don't see how to do that. It's pretty tricky to get the level of type safety you're looking for. I know of two ways: 1. Leave RDD and DStream as they are, but define a typeclass http://danielwestheide.com/blog/2013/02/06/the-neophytes-guide-to-scala-part-12-type-classes.html that allows converting them to a common DistributedCollection type. Example https://gist.github.com/ankurdave/f5d4df4b521ac83b9c7d#file-distributed-collection-via-typeclass-scala . 2. Make RDD and DStream inherit from a common DistributedCollection trait, as in your example, but use F-bounded polymorphism https://twitter.github.io/scala_school/advanced-types.html#fbounded to express the concrete types. Example https://gist.github.com/ankurdave/f5d4df4b521ac83b9c7d#file-distributed-collection-via-fbounded-polymorphism-scala . Ankur http://www.ankurdave.com/
ssh connection refused
I'm trying to launch Spark with this command on AWS: *./spark-ec2 -k keypair_name -i keypair.pem -s 5 -t c1.xlarge -r us-west-2 --hadoop-major-version=2.4.0 launch spark_cluster* This script is erroring out with this message: *ssh: connect to host hostname port 22: Connection refused Error executing remote command, retrying after 30 seconds*: Command '['ssh', '-o', 'StrictHostKeyChecking=no', '-i', 'keypair.pem', '-t', '-t', u'root@hostname', \n [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa \n cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys)\n]' returned non-zero exit status 255 Strange this is, I can manually ssh to master node as root using this command: *ssh root@hostname -i keypair.pem* Does anyone know what is going on here? Any help is appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ssh-connection-refused-tp10818.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: how to publish spark inhouse?
All of the scripts we use to publish Spark releases are in the Spark repo itself, so you could follow these as a guideline. The publishing process in Maven is similar to in SBT: https://github.com/apache/spark/blob/master/dev/create-release/create-release.sh#L65 On Mon, Jul 28, 2014 at 12:39 PM, Koert Kuipers ko...@tresata.com wrote: ah ok thanks. guess i am gonna read up about maven-release-plugin then! On Mon, Jul 28, 2014 at 3:37 PM, Sean Owen so...@cloudera.com wrote: This is not something you edit yourself. The Maven release plugin manages setting all this. I think virtually everything you're worried about is done for you by this plugin. Maven requires artifacts to set a version and it can't inherit one. I feel like I understood the reason this is necessary at one point. On Mon, Jul 28, 2014 at 8:33 PM, Koert Kuipers ko...@tresata.com wrote: and if i want to change the version, it seems i have to change it in all 23 pom files? mhhh. is it mandatory for these sub-project pom files to repeat that version info? useful? spark$ grep 1.1.0-SNAPSHOT * -r | wc -l 23 On Mon, Jul 28, 2014 at 3:05 PM, Koert Kuipers ko...@tresata.com wrote: hey we used to publish spark inhouse by simply overriding the publishTo setting. but now that we are integrated in SBT with maven i cannot find it anymore. i tried looking into the pom file, but after reading 1144 lines of xml i 1) havent found anything that looks like publishing 2) i feel somewhat sick too 3) i am considering alternative careers to developing... where am i supposed to look? thanks for your help!
evaluating classification accuracy
Hi, In order to evaluate the ML classification accuracy, I am zipping up the prediction and test labels as follows and then comparing the pairs in predictionAndLabel: val prediction = model.predict(test.map(_.features)) val predictionAndLabel = prediction.zip(test.map(_.label)) However, I am finding that predictionAndLabel.count() has fewer elements than test.count(). For example, my test vector has 43 elements, but predictionAndLabel has only 38 pairs. I have tried other samples and always get fewer elements after zipping. Does zipping the two vectors cause any compression? or is this because of the distributed nature of the algorithm (I am running it in local mode on a single machine). In order to get the correct accuracy, I need the above comparison to be done by a single node on the entire test data (my data is quite small). How can I ensure that? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/evaluating-classification-accuracy-tp10822.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
hdfs.BlockMissingException on Iterator.hasNext() in mapPartitionsWithIndex()
Hi, I'm trying to split one large multi-field text file into many single-field text files. My code is like this: (somewhat simplified) final BroadcastColSchema bcSchema = sc.broadcast(schema); final String outputPathName = env.outputPathName; sc.textFile(env.inputFileName) .mapPartitionsWithIndex(new Function2Integer, IteratorString, IteratorString() { @Override public IteratorString call(Integer partitionIndex, IteratorString itor) throws Exception { ColSchema schema = bcSchema.value(); FileSystem outputFs = FileSystem.get(new URI(outputPathName), new Configuration()); PrintStream[] outss = new PrintStream[schema.getColCount()]; try { while (itor.hasNext()) { String cols[] = itor.next().split(\t, -1); for (int i = 0; i schema.getColCount(); i++) { String value = cols[i]; if (value.isEmpty()) continue; if (outss[i] == null) outss[i] = new PrintStream( outputFs.create(new Path(outputPathName + / + schema.getColName(i) + .tsv/part- + String.format(%05d, partitionIndex; outss[i].println(value); } } } finally { for (PrintStream outs : outss) if (outs != null) outs.close(); } return new ArrayListString().iterator(); // dummy. } }, true) .count(); // just to invoke mapPartitionsWithIndex(). bcSchema.unpersist(); Basically, it uses mapPartitionsWithIndex() to write multiple single-field file at once partition by partition. Eventually the job succeeds. But occasionally while executing, the following exception is thrown and the task fails (the task is automatically retried by Spark and then succeeds). The location is itor.hasNext(). 14/07/28 19:10:47 WARN TaskSetManager: Lost TID 154 (task 10.0:142) 14/07/28 19:10:47 WARN TaskSetManager: Loss was due to org.apache.hadoop.hdfs.BlockMissingException org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-1127695701-10.254.0.11-1405426572227:blk_1073930972_190153 file=/user/test/Data/big.tsv/part-00142 at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:880 ) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:560) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:7 90) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837) at java.io.DataInputStream.read(DataInputStream.java:100) at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:3 9) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) at com.test.tester.Splitter$17.call(Splitter.java:504) at com.test.tester.Splitter$17.call(Splitter.java:495) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(Java PairRDD.scala:744) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.appl y(JavaRDDLike.scala:81) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.appl y(JavaRDDLike.scala:81) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:569) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:569) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at
RE: streaming sequence files?
Running as Standalone Cluster. From my monitoring console: [spark-logo-77x50px-hd.png] Spark Master at spark://101.73.54.149:7077 * URL: spark://101.73.54.149:7077 * Workers: 1 * Cores: 2 Total, 0 Used * Memory: 2.4 GB Total, 0.0 B Used * Applications: 0 Running, 24 Completed * Drivers: 0 Running, 0 Completed * Status: ALIVE Workers Id Address State Cores Memory worker-20140723222518-101.73.54.149-37995 101.73.54.149:37995 ALIVE 2 (0 Used) 2.4 GB (0.0 B Used) From: tathagata.das1...@gmail.com Date: Sat, 26 Jul 2014 20:14:37 -0700 Subject: Re: streaming sequence files? To: user@spark.apache.org CC: u...@spark.incubator.apache.org Which deployment environment are you running the streaming programs? Standalone? In that case you have to specify what is the max cores for each application, other all the cluster resources may get consumed by the application. http://spark.apache.org/docs/latest/spark-standalone.html TD On Thu, Jul 24, 2014 at 4:57 PM, Barnaby bfa...@outlook.com wrote: I have the streaming program writing sequence files. I can find one of the files and load it in the shell using: scala val rdd = sc.sequenceFile[String, Int](tachyon://localhost:19998/files/WordCounts/20140724-213930) 14/07/24 21:47:50 INFO storage.MemoryStore: ensureFreeSpace(32856) called with curMem=0, maxMem=309225062 14/07/24 21:47:50 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.1 KB, free 294.9 MB) rdd: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1] at sequenceFile at console:12 So I got some type information, seems good. It took a while to research but I got the following streaming code to compile and run: val wordCounts = ssc.fileStream[String, Int, SequenceFileInputFormat[String, Int]](args(0)) It works now and I offer this for reference to anybody else who may be curious about saving sequence files and then streaming them back in. Question: When running both streaming programs at the same time using spark-submit I noticed that only one app would really run. To get the one app to continue I had to stop the other app. Is there a way to get these running simultaneously? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-sequence-files-tp10557p10620.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: [Spark 1.0.1][SparkSQL] reduce stage of shuffle is slow。
spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to takes too much time, what should I do? What is the correct configuration? blockManager timeout if I using a small number of reduce partition. http://apache-spark-user-list.1001560.n3.nabble.com/file/n10825/Screen_Shot_2014-07-29_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SparkSQL-reduce-stage-of-shuffle-is-slow-tp10765p10825.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: ssh connection refused
This may occurred while the ec2 instance are not ready and ssh port not open yet. Please give larger time by specify -w 300. Default should be 120 Thanks, Tracy Sent from my iPhone On 2014年7月29日, at 上午8:17, sparking research...@gmail.com wrote: I'm trying to launch Spark with this command on AWS: *./spark-ec2 -k keypair_name -i keypair.pem -s 5 -t c1.xlarge -r us-west-2 --hadoop-major-version=2.4.0 launch spark_cluster* This script is erroring out with this message: *ssh: connect to host hostname port 22: Connection refused Error executing remote command, retrying after 30 seconds*: Command '['ssh', '-o', 'StrictHostKeyChecking=no', '-i', 'keypair.pem', '-t', '-t', u'root@hostname', \n [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa \n cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys)\n]' returned non-zero exit status 255 Strange this is, I can manually ssh to master node as root using this command: *ssh root@hostname -i keypair.pem* Does anyone know what is going on here? Any help is appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ssh-connection-refused-tp10818.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
The function of ClosureCleaner.clean
Hi, All Before sc.runJob invokes dagScheduler.runJob, the func performed on the rdd is cleaned by ClosureCleaner.clearn. Why spark has to do this? What's the purpose?
Re: The function of ClosureCleaner.clean
I am not sure specifically about specific purpose of this function but Spark needs to remove elements from the closure that may be included by default but not really needed so as to serialize it send it to executors to operate on RDD. For example a function in Map function of RDD may reference objects inside the class, so you may want to send across those objects but not the whole parent class. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Jul 28, 2014 at 8:28 PM, Wang, Jensen jensen.w...@sap.com wrote: Hi, All Before sc.runJob invokes dagScheduler.runJob, the func performed on the rdd is “cleaned” by ClosureCleaner.clearn. Why spark has to do this? What’s the purpose?
Re: Need help, got java.lang.ExceptionInInitializerError in Yarn-Client/Cluster mode
I see Andrew, thanks for the explanantion. On Tue, Jul 29, 2014 at 5:29 AM, Andrew Lee alee...@hotmail.com wrote: I was thinking maybe we can suggest the community to enhance the Spark HistoryServer to capture the last failure exception from the container logs in the last failed stage? This would be helpful. I personally like Yarn-Client mode as all the running status can be checked directly from the console. -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: KMeans: expensiveness of large vectors
Hi Xiangrui, using the current master meant a huge improvement for my task. Something that did not even finish before (training with 120G of dense data) now completes in a reasonable time. I guess using torrent helps a lot in this case. Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614p10833.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Reading hdf5 formats with pyspark
Hi, We have setup spark on a HPC system and are trying to implement some data pipeline and algorithms in place. The input data is in hdf5 (these are very high resolution brain images) and it can be read via h5py library in python. So, my current approach (which seems to be working ) is writing a function def process(filename): #logic and then execute via files = [list of filenames] sc.parallelize(files).foreach(process) Is this the right approach?? -- Mohit When you want success as badly as you want the air, then you will get it. There is no other secret of success. -Socrates
Re: KMeans: expensiveness of large vectors
Great! Thanks for testing the new features! -Xiangrui On Mon, Jul 28, 2014 at 8:58 PM, durin m...@simon-schaefer.net wrote: Hi Xiangrui, using the current master meant a huge improvement for my task. Something that did not even finish before (training with 120G of dense data) now completes in a reasonable time. I guess using torrent helps a lot in this case. Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614p10833.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: evaluating classification accuracy
Are you using 1.0.0? There was a bug, which was fixed in 1.0.1 and master. If you don't want to switch to 1.0.1 or master, try to cache and count test first. -Xiangrui On Mon, Jul 28, 2014 at 6:07 PM, SK skrishna...@gmail.com wrote: Hi, In order to evaluate the ML classification accuracy, I am zipping up the prediction and test labels as follows and then comparing the pairs in predictionAndLabel: val prediction = model.predict(test.map(_.features)) val predictionAndLabel = prediction.zip(test.map(_.label)) However, I am finding that predictionAndLabel.count() has fewer elements than test.count(). For example, my test vector has 43 elements, but predictionAndLabel has only 38 pairs. I have tried other samples and always get fewer elements after zipping. Does zipping the two vectors cause any compression? or is this because of the distributed nature of the algorithm (I am running it in local mode on a single machine). In order to get the correct accuracy, I need the above comparison to be done by a single node on the entire test data (my data is quite small). How can I ensure that? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/evaluating-classification-accuracy-tp10822.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Reading hdf5 formats with pyspark
That looks good to me since there is no Hadoop InputFormat for HDF5. But remember to specify the number of partitions in sc.parallelize to use all the nodes. You can change `process` to `read` which yields records one-by-one. Then sc.parallelize(files, numPartitions).flatMap(read) returns an RDD of records and you can use it as the start of your pipeline. -Xiangrui On Mon, Jul 28, 2014 at 9:05 PM, Mohit Singh mohit1...@gmail.com wrote: Hi, We have setup spark on a HPC system and are trying to implement some data pipeline and algorithms in place. The input data is in hdf5 (these are very high resolution brain images) and it can be read via h5py library in python. So, my current approach (which seems to be working ) is writing a function def process(filename): #logic and then execute via files = [list of filenames] sc.parallelize(files).foreach(process) Is this the right approach?? -- Mohit When you want success as badly as you want the air, then you will get it. There is no other secret of success. -Socrates
HiveContext is creating metastore warehouse locally instead of in hdfs
Hi, Even though hive.metastore.warehouse.dir in hive-site.xml is set to the default user/hive/warehouse and the permissions are correct in hdfs, HiveContext seems to be creating metastore locally instead of hdfs. After looking into the spark code, I found the following in HiveContext.scala: /** * SQLConf and HiveConf contracts: when the hive session is first initialized, params in * HiveConf will get picked up by the SQLConf. Additionally, any properties set by * set() or a SET command inside hql() or sql() will be set in the SQLConf *as well as* * in the HiveConf. */ @transient protected[hive] lazy val hiveconf = new HiveConf(classOf[SessionState]) @transient protected[hive] lazy val sessionState = { val ss = new SessionState(hiveconf) set(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf. ss } It seems as though when a HiveContext is created, it is launched without any configuration and hive-site.xml is not used to set properties. It looks like I can set properties after creation by using hql() method but what I am looking for is for the hive context to be initialized according to the configuration in hive-site.xml at the time of initialization. Any help would be greatly appreciated! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-is-creating-metastore-warehouse-locally-instead-of-in-hdfs-tp10838.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Joining spark user group
SparkSQL can not use SchemaRDD from Hive
Hi I got a error message while using Hive and SparkSQL. This is code snippet I used. (in spark-shell , 1.0.0) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val hive = new org.apache.spark.sql.hive.HiveContext(sc) var sample = hive.hql(select * from sample10) // This creates SchemaRDD. I have table 'sample10' in hive. var countHive = sample.count() // It works sqlContext.registerRDDAsTable(sample,temp) sqlContext.sql(select * from temp).count() // It gives me a error message java.lang.RuntimeException: Table Not Found: sample10 I don't know why this happen. Does SparkSQL conflict with Hive? Thanks, Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-can-not-use-SchemaRDD-from-Hive-tp10841.html Sent from the Apache Spark User List mailing list archive at Nabble.com.