Hadoop 2.3 Centralized Cache vs RDD
Hi, Any comments or thoughts on the implications of the newly released feature from Hadoop 2.3 on the centralized cache? How different it is from RDD? Many thanks. Cao
Re: maven for building scala simple program
Hi Ryan, It worked like a charm. Much appreciated. Laeeq. On Wednesday, May 7, 2014 1:30 AM, Ryan Compton compton.r...@gmail.com wrote: I've been using this (you'll need maven 3). project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupIdcom.mycompany.app/groupId artifactIdmy-app/artifactId version1.0-SNAPSHOT/version packagingjar/packaging namemy-app/name urlhttp://maven.apache.org/url properties maven.compiler.source1.6/maven.compiler.source maven.compiler.target1.6/maven.compiler.target encodingUTF-8/encoding scala.version2.10.4/scala.version /properties build pluginManagement plugins plugin groupIdnet.alchim31.maven/groupId artifactIdscala-maven-plugin/artifactId version3.1.5/version /plugin plugin groupIdorg.apache.maven.plugins/groupId artifactIdmaven-compiler-plugin/artifactId version2.0.2/version /plugin /plugins /pluginManagement plugins plugin groupIdnet.alchim31.maven/groupId artifactIdscala-maven-plugin/artifactId executions execution idscala-compile-first/id phaseprocess-resources/phase goals goaladd-source/goal goalcompile/goal /goals /execution execution idscala-test-compile/id phaseprocess-test-resources/phase goals goaltestCompile/goal /goals /execution /executions /plugin !-- Plugin to create a single jar that includes all dependencies -- plugin artifactIdmaven-assembly-plugin/artifactId version2.4/version configuration descriptorRefs descriptorRefjar-with-dependencies/descriptorRef /descriptorRefs /configuration executions execution idmake-assembly/id phasepackage/phase goals goalsingle/goal /goals /execution /executions /plugin /plugins /build dependencies dependency groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId version${scala.version}/version /dependency /dependencies /project On Tue, May 6, 2014 at 4:10 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi all, If anyone is using maven for building scala classes with all dependencies for spark, please provide a sample pom.xml here. I have having trouble using maven for scala simple job though it was working properly for java. I have added scala maven plugin but still getting some issues. Laeeq
cant get tests to pass anymore on master master
i used to be able to get all tests to pass. with java 6 and sbt i get PermGen errors (no matter how high i make the PermGen). so i have given up on that. with java 7 i see 1 error in a bagel test and a few in streaming tests. any ideas? see the error in BagelSuite below. [info] - large number of iterations *** FAILED *** (10 seconds, 105 milliseconds) [info] The code passed to failAfter did not complete within 10 seconds. (BagelSuite.scala:85) [info] org.scalatest.exceptions.TestFailedDueToTimeoutException: [info] at org.scalatest.concurrent.Timeouts$$anonfun$failAfter$1.apply(Timeouts.scala:250) [info] at org.scalatest.concurrent.Timeouts$$anonfun$failAfter$1.apply(Timeouts.scala:250) [info] at org.scalatest.concurrent.Timeouts$class.timeoutAfter(Timeouts.scala:282) [info] at org.scalatest.concurrent.Timeouts$class.failAfter(Timeouts.scala:246) [info] at org.apache.spark.bagel.BagelSuite.failAfter(BagelSuite.scala:32) [info] at org.apache.spark.bagel.BagelSuite$$anonfun$3.apply$mcV$sp(BagelSuite.scala:85) [info] at org.apache.spark.bagel.BagelSuite$$anonfun$3.apply(BagelSuite.scala:85) [info] at org.apache.spark.bagel.BagelSuite$$anonfun$3.apply(BagelSuite.scala:85) [info] at org.scalatest.FunSuite$$anon$1.apply(FunSuite.scala:1265) [info] at org.scalatest.Suite$class.withFixture(Suite.scala:1974) [info] at org.apache.spark.bagel.BagelSuite.withFixture(BagelSuite.scala:32)
Re: Spark workers keep getting disconnected(Keep dying) from the cluster.
Got the same experience over here. 0.9.1 (not from github, from official download page), running hadoop 2.2. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-workers-keep-getting-disconnected-Keep-dying-from-the-cluster-tp5740p5747.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error starting EC2 cluster
Well... the reason was an out-of-date version of Python (2.6.6) on the machine where I ran the script. If anyone else experiences this issue - just update your Python. On Sun, May 4, 2014 at 7:51 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: I am using Spark 0.9.1. When I'm trying to start a EC2 cluster with the spark-ec2 script, an error occurs and the following message is issued: AttributeError: 'module' object has no attribute 'check_output'. By this time, EC2 instances are up and running but Spark doesn't seem to be installed on them. Any ideas how to fix it? $ ./spark-ec2 -k my_key -i /home/alitouka/my_key.pem -s 1 --region=us-east-1 --instance-type=m3.medium launch test_cluster Setting up security groups... Searching for existing cluster test_cluster... Don't recognize m3.medium, assuming type is pvm Spark AMI: ami-5bb18832 Launching instances... Launched 1 slaves in us-east-1c, regid = r- Launched master in us-east-1c, regid = r- Waiting for instances to start up... Waiting 120 more seconds... Generating cluster's SSH key on master... ssh: connect to host ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com port 22: Connection refused Error executing remote command, retrying after 30 seconds: Command '['ssh', '-o', 'StrictHostKeyChecking=no', '-i', '/home/alitouka/my_key.pem', '-t', '-t', u'r...@ec2-xx-xxx-xxx-xx.compute-1.amazonaws.com', \n [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa \n cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys)\n]' returned non-zero exit status 255 ssh: connect to host ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com port 22: Connection refused Error executing remote command, retrying after 30 seconds: Command '['ssh', '-o', 'StrictHostKeyChecking=no', '-i', '/home/alitouka/my_key.pem', '-t', '-t', u'r...@ec2-xx-xxx-xxx-xx.compute-1.amazonaws.com', \n [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa \n cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys)\n]' returned non-zero exit status 255 ssh: connect to host ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com port 22: Connection refused Error executing remote command, retrying after 30 seconds: Command '['ssh', '-o', 'StrictHostKeyChecking=no', '-i', '/home/alitouka/my_key.pem', '-t', '-t', u'r...@ec2-xx-xxx-xxx-xx.compute-1.amazonaws.com', \n [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa \n cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys)\n]' returned non-zero exit status 255 Warning: Permanently added 'ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com,54.227.205.82' (RSA) to the list of known hosts. Connection to ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com closed. Traceback (most recent call last): File ./spark_ec2.py, line 806, in module main() File ./spark_ec2.py, line 799, in main real_main() File ./spark_ec2.py, line 684, in real_main setup_cluster(conn, master_nodes, slave_nodes, opts, True) File ./spark_ec2.py, line 419, in setup_cluster dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) File ./spark_ec2.py, line 624, in ssh_read return subprocess.check_output( AttributeError: 'module' object has no attribute 'check_output'
Re: is Mesos falling out of favor?
By looking at your config, I think there's something wrong with your setup. One of the key elements of Mesos is that you are abstracted from where the execution of your task takes place. The SPARK_EXECUTOR_URI tells Mesos where to find the 'framework' (in Mesos jargon) required to execute a job. (Actually, it tells the spark driver to tell mesos where to download the framework) Your config looks like you are running some mix of Spark Cluster with Mesos. This is an example of a Spark job to run on Mesos: Driver: ADD_JARS=/.../job-jar-with-dependencies.jar SPARK_LOCAL_IP=IP java -cp /.../spark-assembly.jar:/.../job-jar-with-dependencies.jar -Dconfig.file=job-config.conf com.example.jobs.SparkJob Config: job-config.conf contains this info on Mesos: (Note the Mesos URI is constructed from this config # # Mesos configuration # mesos { zookeeper = {zookeeper.ip} executorUri = hdfs://${hdfs.nameNode.host}:${hdfs.nameNode.port}/spark/spark-0.9.0.1-bin.tar.gz master { host = {mesos-ip} port = 5050 } } Probably this can still be improved as it's the result of some trial-error-repeat, but it's working for us. -greetz, Gerard On Wed, May 7, 2014 at 7:43 PM, deric barton.to...@gmail.com wrote: I'm running 1.0.0 branch, finally I've managed to make it work. I'm using a Debian package which is distributed on all slave nodes. So, I've removed `SPARK_EXECUTOR_URI` and it works, spark-env.sh looks like this: export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so export SCALA_HOME=/usr export SCALA_LIBRARY_PATH=/usr/share/java export MASTER=mesos://zk://192.168.1.1:2181/mesos export SPARK_HOME=/usr/share/spark export SPARK_LOCAL_IP=192.168.1.2 export SPARK_PRINT_LAUNCH_COMMAND=1 export CLASSPATH=$CLASSPATH:$SPARK_HOME/lib/ scripts for Debian package are here (I'll try to add some documentation): https://github.com/deric/spark-deb-packaging -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/is-Mesos-falling-out-of-favor-tp5444p5484.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
unsubscribe
Re: Packaging a spark job using maven
Laurent the problem is that the reference.conf that is embedded in akka jars is being overriden by some other conf. This happens when multiple files have the same name. I am using Spark with maven. In order to build the fat jar I use the shade plugin and it works pretty well. The trick here is to use an AppendingTransformer that will merge all the resource.conf into a single one. Try something like that: plugin groupIdorg.apache.maven.plugins/groupId artifactIdmaven-shade-plugin/artifactId version2.1/version executions execution phasepackage/phase goals goalshade/goal /goals configuration minimizeJarfalse/minimizeJar createDependencyReducedPomfalse/createDependencyReducedPom artifactSet includes !-- Include here the dependencies you want to be packed in your fat jar -- includemy.package.etc:*/include /includes /artifactSet filters filter artifact*:*/artifact excludes excludeMETA-INF/*.SF/exclude excludeMETA-INF/*.DSA/exclude excludeMETA-INF/*.RSA/exclude /excludes /filter /filters transformers transformer implementation=org.apache.maven.plugins.shade.resource.AppendingTransformer resourcereference.conf/resource /transformer /transformers /configuration /execution /executions /plugin 2014-05-14 15:37 GMT+02:00 Laurent T laurent.thou...@ldmobile.net: Hi, Thanks François but this didn't change much. I'm not even sure what this reference.conf is. It isn't mentioned in any of spark documentation. Should i have one in my resources ? Thanks Laurent -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Packaging-a-spark-job-using-maven-tp5615p5707.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark workers keep getting disconnected(Keep dying) from the cluster.
Hey, I am facing a weird issue. My spark workers keep dying every now and then and in the master logs i keep on seeing following messages, 14/05/14 10:09:24 WARN Master: Removing worker-20140514080546-x.x.x.x-50737 because we got no heartbeat in 60 seconds 14/05/14 14:18:41 WARN Master: Removing worker-20140514123848-x.x.x.x-50901 because we got no heartbeat in 60 seconds In my cluster, I have one master node and four worker nodes. On the cluster i am trying to run shark and related queries. I tried setting the property, spark.worker.timeout=300 on all workers and master but still it shows, 60 seconds timeout. After that, i keep seeing the following messages as well, 14/05/14 16:59:52 INFO Master: Removing app app-20140514164003-0009 On the worker nodes, in the work folder, i cant seem to find any suspicious messages. Any help as to what is causing all this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-workers-keep-getting-disconnected-Keep-dying-from-the-cluster-tp5740.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark 0.9.1 textFile hdfs unknown host exception
Hi, I have some strange behaviour when using textFile to read some data from HDFS in spark 0.9.1. I get UnknownHost exceptions, where hadoop client tries to resolve the dfs.nameservices and fails. So far: - this has been tested inside the shell - the exact same code works with spark-0.8.1 - the shell is launched with HADOOP_CONF_DIR pointing to our HA conf - if before that some other rdd is created from HDFS and succeeds than, this works also (might be related in the way the default hadoop configuration is being shared?) - if using the new MR API it works sc.newAPIHadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], sc.hadoopConfiguration).map(_._2.toString) Hadoop disitribution: 2.0.0-cdh4.1.2 Spark 0.9.1 - packaged with correct version of hadoop Eugen
Efficient implementation of getting top 10 hashtags in last 5 mins window
I wanted to know how can we efficiently get top 10 hashtags in last 5 mins window. Currently I am using reduceByKeyAndWindow over 5 mins window and then sorting to get top 10 hashtags. But it is taking a lot of time. How can we do it efficiently ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-implementation-of-getting-top-10-hashtags-in-last-5-mins-window-tp5741.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark LIBLINEAR
I've done some comparisons with my own implementation of TRON on Spark. From a distributed computing perspective, it does 2x more local work per iteration than LBFGS, so the parallel isoefficiency is improved slightly. I think the truncated Newton solver holds some potential because there have been some recent work in preconditioners: http://dx.doi.org/10.1016/j.amc.2014.03.006 On Wed, May 14, 2014 at 9:32 AM, Debasish Das debasish.da...@gmail.comwrote: Hi Professor Lin, On our internal datasets, I am getting accuracy at par with glmnet-R for sparse feature selection from liblinear. The default mllib based gradient descent was way off. I did not tune learning rate but I run with varying lambda. Ths feature selection was weak. I used liblinear code. Next I will explore the distributed liblinear. Adding the code on github will definitely help for collaboration. I am experimenting if a bfgs / owlqn based sparse logistic in spark mllib give us accuracy at par with liblinear. If liblinear solver outperforms them (either accuracy/performance) we have to bring tron to mllib and let other algorithms benefit from it as well. We are using Bfgs and Owlqn solvers from breeze opt. Thanks. Deb On May 12, 2014 9:07 PM, DB Tsai dbt...@stanford.edu wrote: It seems that the code isn't managed in github. Can be downloaded from http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/distributed-liblinear/spark/spark-liblinear-1.94.zip It will be easier to track the changes in github. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Mon, May 12, 2014 at 7:53 AM, Xiangrui Meng men...@gmail.com wrote: Hi Chieh-Yen, Great to see the Spark implementation of LIBLINEAR! We will definitely consider adding a wrapper in MLlib to support it. Is the source code on github? Deb, Spark LIBLINEAR uses BSD license, which is compatible with Apache. Best, Xiangrui On Sun, May 11, 2014 at 10:29 AM, Debasish Das debasish.da...@gmail.com wrote: Hello Prof. Lin, Awesome news ! I am curious if you have any benchmarks comparing C++ MPI with Scala Spark liblinear implementations... Is Spark Liblinear apache licensed or there are any specific restrictions on using it ? Except using native blas libraries (which each user has to manage by pulling in their best proprietary BLAS package), all Spark code is Apache licensed. Thanks. Deb On Sun, May 11, 2014 at 3:01 AM, DB Tsai dbt...@stanford.edu wrote: Dear Prof. Lin, Interesting! We had an implementation of L-BFGS in Spark and already merged in the upstream now. We read your paper comparing TRON and OWL-QN for logistic regression with L1 (http://www.csie.ntu.edu.tw/~cjlin/papers/l1.pdf), but it seems that it's not in the distributed setup. Will be very interesting to know the L2 logistic regression benchmark result in Spark with your TRON optimizer and the L-BFGS optimizer against different datasets (sparse, dense, and wide, etc). I'll try your TRON out soon. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sun, May 11, 2014 at 1:49 AM, Chieh-Yen r01944...@csie.ntu.edu.tw wrote: Dear all, Recently we released a distributed extension of LIBLINEAR at http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/distributed-liblinear/ Currently, TRON for logistic regression and L2-loss SVM is supported. We provided both MPI and Spark implementations. This is very preliminary so your comments are very welcome. Thanks, Chieh-Yen
Re: spark on yarn-standalone, throws StackOverflowError and fails somtimes and succeed for the rest
Could you try `println(result.toDebugString())` right after `val result = ...` and attach the result? -Xiangrui On Fri, May 9, 2014 at 8:20 AM, phoenix bai mingzhi...@gmail.com wrote: after a couple of tests, I find that, if I use: val result = model.predict(prdctpairs) result.map(x = x.user+,+x.product+,+x.rating).saveAsTextFile(output) it always fails with above error and the exception seems iterative. but if I do: val result = model.predict(prdctpairs) result.cach() result.map(x = x.user+,+x.product+,+x.rating).saveAsTextFile(output) it succeeds. could anyone help explain why the cach() is necessary? thanks On Fri, May 9, 2014 at 6:45 PM, phoenix bai mingzhi...@gmail.com wrote: Hi all, My spark code is running on yarn-standalone. the last three lines of the code as below, val result = model.predict(prdctpairs) result.map(x = x.user+,+x.product+,+x.rating).saveAsTextFile(output) sc.stop() the same code, sometimes be able to run successfully and could give out the right result, while from time to time, it throws StackOverflowError and fail. and I don`t have a clue how I should debug. below is the error, (the start and end portion to be exact): 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17] MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 44 to sp...@rxx43.mc10.site.net:43885 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17] MapOutputTrackerMaster: Size of output statuses for shuffle 44 is 148 bytes 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-35] MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 45 to sp...@rxx43.mc10.site.net:43885 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-35] MapOutputTrackerMaster: Size of output statuses for shuffle 45 is 453 bytes 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-20] MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 44 to sp...@rxx43.mc10.site.net:56767 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29] MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 45 to sp...@rxx43.mc10.site.net:56767 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29] MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 44 to sp...@rxx43.mc10.site.net:49879 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29] MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 45 to sp...@rxx43.mc10.site.net:49879 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17] TaskSetManager: Starting task 946.0:17 as TID 146 on executor 6: rx15.mc10.site.net (PROCESS_LOCAL) 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17] TaskSetManager: Serialized task 946.0:17 as 6414 bytes in 0 ms 14-05-09 17:55:51 WARN [Result resolver thread-0] TaskSetManager: Lost TID 133 (task 946.0:4) 14-05-09 17:55:51 WARN [Result resolver thread-0] TaskSetManager: Loss was due to java.lang.StackOverflowError java.lang.StackOverflowError at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631) at java.lang.ClassLoader.defineClass(ClassLoader.java:615) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) at java.net.URLClassLoader.defineClass(URLClassLoader.java:283) at java.net.URLClassLoader.access$000(URLClassLoader.java:58) at java.net.URLClassLoader$1.run(URLClassLoader.java:197) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631) at java.lang.ClassLoader.defineClass(ClassLoader.java:615) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at
count()-ing gz files gives java.io.IOException: incorrect header check
I’m trying to do a simple count() on a large number of GZipped files in S3. My job is failing with the following message: 14/05/15 19:12:37 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException java.io.IOException: incorrect header check at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method) at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:221) at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:82) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:76) at java.io.InputStream.read(InputStream.java:101) snipped I traced this down to HADOOP-5281https://issues.apache.org/jira/browse/HADOOP-5281, but I’m not sure if 1) it’s the same issue, or 2) how to go about resolving it. I gather I need to update some Hadoop jar? Any tips on where to look/what to do? I’m running Spark on an EC2 cluster created by spark-ec2 with no special options used. Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/count-ing-gz-files-gives-java-io-IOException-incorrect-header-check-tp5768.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Stable Hadoop version supported ?
Although you need to compile it differently for different versions of HDFS / Hadoop, as far as I know Spark continues to work with Hadoop 1.x (and probably older 0.20.x as a result -- your experience is an existence proof.) And it works with the newest Hadoop 2.4.x, again with the appropriate build settings. I think the default answer is to upgrade all the way to the newest Hadoop / HDFS unless you have a reason you can't. On Wed, May 14, 2014 at 8:17 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Currently I've HDFS with version hadoop0.20.2-cdh3u6 on Spark 0.9.1. I want to upgrade to Spark 1.0.0 soon and would also like to upgrade my HDFS version as well. What's the recommended version of HDFS to use with Spark 1.0.0? I don't know much about YARN but I would just like to use the Spark standalone cluster mode. Thanks -Soumya
Calling external classes added by sc.addJar needs to be through reflection
Finally find a way out of the ClassLoader maze! It took me some times to understand how it works; I think it worths to document it in a separated thread. We're trying to add external utility.jar which contains CSVRecordParser, and we added the jar to executors through sc.addJar APIs. If the instance of CSVRecordParser is created without reflection, it raises *ClassNotFound Exception*. data.mapPartitions(lines = { val csvParser = new CSVRecordParser((delimiter.charAt(0)) lines.foreach(line = { val lineElems = csvParser.parseLine(line) }) ... ... ) If the instance of CSVRecordParser is created through reflection, it works. data.mapPartitions(lines = { val loader = Thread.currentThread.getContextClassLoader val CSVRecordParser = loader.loadClass(com.alpine.hadoop.ext.CSVRecordParser) val csvParser = CSVRecordParser.getConstructor(Character.TYPE) .newInstance(delimiter.charAt(0).asInstanceOf[Character]) val parseLine = CSVRecordParser .getDeclaredMethod(parseLine, classOf[String]) lines.foreach(line = { val lineElems = parseLine.invoke(csvParser, line).asInstanceOf[Array[String]] }) ... ... ) This is identical to this question, http://stackoverflow.com/questions/7452411/thread-currentthread-setcontextclassloader-without-using-reflection It's not intuitive for users to load external classes through reflection, but couple available solutions including 1) messing around systemClassLoader by calling systemClassLoader.addURI through reflection or 2) forking another JVM to add jars into classpath before bootstrap loader are very tricky. Any thought on fixing it properly? @Xiangrui, netlib-java jniloader is loaded from netlib-java through reflection, so this problem will not be seen. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai
Re: Schema view of HadoopRDD
Is there any Spark plugin/add-on that facilitate the query to a JSON content? Best, Flavio On Thu, May 15, 2014 at 6:53 PM, Michael Armbrust mich...@databricks.comwrote: Here is a link with more info: http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html On Wed, May 7, 2014 at 10:09 PM, Debasish Das debasish.da...@gmail.comwrote: Hi, For each line that we read as textLine from HDFS, we have a schema..if there is an API that takes the schema as List[Symbol] and maps each token to the Symbol it will be helpful... Does RDDs provide a schema view of the dataset on HDFS ? Thanks. Deb
Re: Equivalent of collect() on DStream
Doesnt DStream.foreach() suffice? anyDStream.foreach { rdd = // do something with rdd } On Wed, May 14, 2014 at 9:33 PM, Stephen Boesch java...@gmail.com wrote: Looking further it appears the functionality I am seeking is in the following *private[spark] * class ForEachdStream (version 0.8.1 , yes we are presently using an older release..) private[streaming] class ForEachDStream[T: ClassManifest] ( parent: DStream[T], *foreachFunc: (RDD[T], Time) = Unit* ) extends DStream[Unit](parent.ssc) { I would like to have access to this structure - particularly the ability to define an foreachFunc that gets applied to each RDD within the DStream. Is there a means to do so? 2014-05-14 21:25 GMT-07:00 Stephen Boesch java...@gmail.com: Given that collect() does not exist on DStream apparently my mental model of Streaming RDD (DStream) needs correction/refinement. So what is the means to convert DStream data into a JVM in-memory representation. All of the methods on DStream i.e. filter, map, transform, reduce, etc generate other DStream's, and not an in memory data structure.
unsubscribe
unsubscribe
Re: How to run the SVM and LogisticRegression
If you check out the master branch, there are some examples that can be used as templates under examples/src/main/scala/org/apache/spark/examples/mllib Best, Xiangrui On Wed, May 14, 2014 at 1:36 PM, yxzhao yxz...@ualr.edu wrote: Hello, I found the classfication algorithms SVM and LogisticRegression implemented in the following directory. And how to run them? What is the commnad line should be? Thanks. spark-0.9.0-incubating/mllib/src/main/scala/org/apache/spark/mllib/classification -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-run-the-SVM-and-LogisticRegression-tp5720.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Standalone client failing with docker deployed cluster
Hi, I'm running the spark server with a single worker on a laptop using the docker images. The spark shell examples run fine with this setup. However, a standalone java client that tries to run wordcount on a local files (1 MB in size), the execution fails with the following error on the stdout of the worker: 14/05/15 10:31:21 INFO Slf4jLogger: Slf4jLogger started 14/05/15 10:31:21 INFO Remoting: Starting remoting 14/05/15 10:31:22 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@worker1:55924] 14/05/15 10:31:22 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@worker1:55924] 14/05/15 10:31:22 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@R9FX97h.local:56720/user/CoarseGrainedScheduler 14/05/15 10:31:22 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@worker1:50040/user/Worker 14/05/15 10:31:22 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://spark@R9FX97h.local:56720]. Address is now gated for 6 ms, all messages to this address will be delivered to dead letters. 14/05/15 10:31:22 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@worker1:55924] - [akka.tcp://spark@R9FX97h.local:56720] disassociated! Shutting down. I noticed the following messages on the worker console when I attached through docker: 14/05/15 11:24:33 INFO Worker: Asked to launch executor app-20140515112408-0005/7 for billingLogProcessor 14/05/15 11:24:33 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@worker1:50040] - [akka.tcp://sparkExecutor@worker1:42437]: Error [Association failed with [akka.tcp://sparkExecutor@worker1:42437]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@worker1:42437] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: worker1/172.17.0.4:42437 ] 14/05/15 11:24:33 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@worker1:50040] - [akka.tcp://sparkExecutor@worker1:42437]: Error [Association failed with [akka.tcp://sparkExecutor@worker1:42437]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@worker1:42437] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: worker1/172.17.0.4:42437 ] 14/05/15 11:24:33 INFO ExecutorRunner: Launch command: /usr/lib/jvm/java-7-openjdk-amd64/bin/java -cp :/opt/spark-0.9.0/conf:/opt/spark-0.9.0/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@R9FX97h.local:46986/user/CoarseGrainedScheduler 7 worker1 1 akka.tcp://sparkWorker@worker1:50040/user/Worker app-20140515112408-0005 14/05/15 11:24:35 INFO Worker: Executor app-20140515112408-0005/7 finished with state FAILED message Command exited with code 1 exitStatus 1 14/05/15 11:24:35 INFO LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%40172.17.0.4%3A33648-135#310170905] was not delivered. [34] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 14/05/15 11:24:35 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@worker1:50040] - [akka.tcp://sparkExecutor@worker1:56594]: Error [Association failed with [akka.tcp://sparkExecutor@worker1:56594]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@worker1:56594] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: worker1/172.17.0.4:56594 ] 14/05/15 11:24:35 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@worker1:50040] - [akka.tcp://sparkExecutor@worker1:56594]: Error [Association failed with [akka.tcp://sparkExecutor@worker1:56594]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@worker1:56594] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: worker1/172.17.0.4:56594 ] The significant code snippets from the standalone java client are as follows: JavaSparkContext ctx = new JavaSparkContext(masterAddr, log_processor, sparkHome, jarFileLoc); JavaRDDString rawLog = ctx.textFile(/tmp/some.log); ListTuple2String, Long topRecords = rawLog.map(fieldSplitter).map(fieldExtractor).top(5, tupleComparator); However, running the sample code provided on github (amplab docker page) over the spark shell went through fine with the following stdout message: 14/05/15 10:39:41 INFO Slf4jLogger: Slf4jLogger started 14/05/15 10:39:42 INFO Remoting: Starting remoting 14/05/15 10:39:42
unsubscribe
unsubscribe
advice on maintaining a production spark cluster?
Hey folks, I'm wondering what strategies other folks are using for maintaining and monitoring the stability of stand-alone spark clusters. Our master very regularly loses workers, and they (as expected) never rejoin the cluster. This is the same behavior I've seen using akka cluster (if that's what spark is using in stand-alone mode) -- are there configuration options we could be setting to make the cluster more robust? We have a custom script which monitors the number of workers (through the web interface) and restarts the cluster when necessary, as well as resolving other issues we face (like spark shells left open permanently claiming resources), and it works, but it's no where close to a great solution. What are other folks doing? Is this something that other folks observe as well? I suspect that the loss of workers is tied to jobs that run out of memory on the client side or our use of very large broadcast variables, but I don't have an isolated test case. I'm open to general answers here: for example, perhaps we should simply be using mesos or yarn instead of stand-alone mode. --j
Workers unable to find class, even when in the SparkConf JAR list
I'm using spark-ec2 to run some Spark code. When I set master to local, then it runs fine. However, when I set master to $MASTER, the workers immediately fail, with java.lang.NoClassDefFoundError for the classes. I've used sbt-assembly to make a jar with the classes, confirmed using jar tvf that the classes are there, and set SparkConf to distribute the classes. The Spark Web UI indeed shows the assembly jar to be added to the classpath: http://172.x.x.x47441/jars/myjar-assembly-1.0.jar It seems that, despite the fact that myjar-assembly contains the class, and is being added to the cluster, it's not reaching the workers. How do I fix this? (Do I need to manually copy the jar file? If so, to which dir? I thought that the point of the SparkConf add jars was to do this automatically)
Re: Express VMs - good idea?
Hi Marco, Hive itself is not working in the CDH5.0 VM (due to FNFE's on the third party jars). While you did not mention using Shark, you may keep that in mind. I will try out spark-only commands late today and report what I find. 2014-05-14 5:00 GMT-07:00 Marco Shaw marco.s...@gmail.com: Hi, I've wanted to play with Spark. I wanted to fast track things and just use one of the vendor's express VMs. I've tried Cloudera CDH 5.0 and Hortonworks HDP 2.1. I've not written down all of my issues, but for certain, when I try to run spark-shell it doesn't work. Cloudera seems to crash, and both complain when I try to use SparkContext in a simple Scala command. So, just a basic question on whether anyone has had success getting these express VMs to work properly with Spark *out of the box* (HDP does required you install Spark manually). I know Cloudera recommends 8GB of RAM, but I've been running it with 4GB. Could it be that 4GB is just not enough, and causing issues or have others had success using these Hadoop 2.x pre-built VMs with Spark 0.9.x? Marco
Re: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext
Serializing the main object isn't going to help here - it's SparkContext it's complaining about. The problem is that the context is, according to the code you sent, computeDwt has a signature of: class DWTSample ... { def computDWT (sc: SparkContext, data: ArrayBuffer[(Int, Double)]): List[Double] } do you need the SparkContext within that function? That function is executing out on your workers; they shouldn't be trying to send work directly to other workers anyway, or using RDDs or other spark contexts, they should just be working with the data. If you can eliminate the SparkContext parameter there, you should be fine. Also, I don't know how expensive DWTSample is to produce, or if you need a separate instance of each record; if you need one for each record, as is indicated by the code you sent, it doesn't actually have to be serializable - you're creating it out on the worker nodes, not sending it to them from the client node. If you don't need a unique instance per record, then you can either use the serializable nature to just create one, and use that one for each record, or if you would prefer it not to be serializable, you can create one per partition and use that one on each record in the partition: kk = series.mapPartitions(iter = { val sampler = new DWTsample() iter.map(i = sampler.computeDwt(i._2)) }) (assuming you eliminated the sc parameter, of course) Hope this helps! On Mon, May 12, 2014 at 2:27 AM, yh18190 yh18...@gmail.com wrote: Hi, I am facing above exception when I am trying to apply a method(ComputeDwt) on RDD[(Int,ArrayBuffer[(Int,Double)])] input. I am even using extends Serialization option to serialize objects in spark.Here is the code snippet. Could anyone suggest me what could be the problem and what should be done to overcome this issue.??? input:series:RDD[(Int,ArrayBuffer[(Int,Double)])] DWTsample extends Serialization is a class having computeDwt function. sc: sparkContext val kk:RDD[(Int,List[Double])]=series.map(t=(t._1,new DWTsample().computeDwt(sc,t._2))) Error: org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Job-failed-java-io-NotSerializableException-org-apache-spark-SparkContext-tp5585.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Software Engineer Analytics Engineering Team@ Box Mountain View, CA -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: Efficient implementation of getting top 10 hashtags in last 5 mins window
Hi nilmish, One option for you is to consider moving to a different algorithm. The SpaceSaver/StreamSummary method will get you approximate results in exchange for smaller data structure size. It has an implementation in Twitter's Algebird library, if you're using Scala: https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/SpaceSaver.scala and has a more general write up here: http://boundary.com/blog/2013/05/14/approximate-heavy-hitters-the-spacesaving-algorithm/ I believe it will let you avoid an expensive sort of all the hundreds of thousands of hashtags you can see in a day. Best, --Brian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-implementation-of-getting-top-10-hashtags-in-last-5-mins-window-tp5741p5845.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: filling missing values in a sequence
Hello Mohit, I don't think there's a direct way of bleeding elements across partitions. But you could write it yourself relatively succinctly: A) Sort the RDD B) Look at the sorted RDD's partitions with the .mapParititionsWithIndex( ) method. Map each partition to its partition ID, and its maximum element. Collect the (partID, maxElements) in the driver. C) Broadcast the collection of (partID, part's max element) tuples D) Look again at the sorted RDD's partitions with mapPartitionsWithIndex( ). For each partition K: D1) Find the immediately-preceding partition K -1 , and its associated maximum value. Use that to decide how many values are missing between the last element of part K-1 and the first element of part K. D2) Step through part K's elements and find the rest of the missing elements in that part This approach sidesteps worries you might have over the hack of using .filter to remove the first element (how do you want to handle ties, for instance?), as well as the possible fragility of zipping. --Brian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/filling-missing-values-in-a-sequence-tp5708p5846.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark with Drill
Hi, I am trying to understand and and seeing Drill as one of the upcoming interesting tool outside. Can somebody clarify where Drill is going to position in Hadoop ecosystem compare with Spark and Shark? Is it going to be used as alternative to any one of the Spark/Shark or Storm? Or Drill can integrate with them in this stack layer. Also seeing MapR (major contributor of Drill) has going to packaging Spark in their recent announcement. Thanks, Ravi
How to pass config variables to workers
What is a good way to pass config variables to workers? I've tried setting them in environment variables via spark-env.sh, but, as far as I can tell, the environment variables set there don't appear in workers' environments. If I want to be able to configure all workers, what's a good way to do it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-config-variables-to-workers-tp5780.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: JavaNetworkWordCount
It would look ugly.. as explicit datatypes need to be mentioned.. you are better off using parallelize instead. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, May 16, 2014 at 6:11 PM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi Guys, TD has given me this piece of code: “sparkContext.makeRDD(1 to 100, 100).collect(), I am using a java code of NetworkWordcount, How could I use this piece in this code in java? Thanks Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: cant get tests to pass anymore on master master
i tried on a few different machines, including a server, all same ubuntu and same java, and got same errors. i also tried modifying the timeouts in the unit tests and it did not help. ok i will try blowing away local maven repo and do clean. On Thu, May 15, 2014 at 12:49 PM, Sean Owen so...@cloudera.com wrote: Since the error concerns a timeout -- is the machine slowish? What about blowing away everything in your local maven repo, do a clean, etc. to rule out environment issues? I'm on OS X here FWIW. On Thu, May 15, 2014 at 5:24 PM, Koert Kuipers ko...@tresata.com wrote: yeah sure. it is ubuntu 12.04 with jdk1.7.0_40 what else is relevant that i can provide? On Thu, May 15, 2014 at 12:17 PM, Sean Owen so...@cloudera.com wrote: FWIW I see no failures. Maybe you can say more about your environment, etc. On Wed, May 7, 2014 at 10:01 PM, Koert Kuipers ko...@tresata.com wrote: i used to be able to get all tests to pass. with java 6 and sbt i get PermGen errors (no matter how high i make the PermGen). so i have given up on that. with java 7 i see 1 error in a bagel test and a few in streaming tests. any ideas? see the error in BagelSuite below. [info] - large number of iterations *** FAILED *** (10 seconds, 105 milliseconds) [info] The code passed to failAfter did not complete within 10 seconds. (BagelSuite.scala:85) [info] org.scalatest.exceptions.TestFailedDueToTimeoutException: [info] at org.scalatest.concurrent.Timeouts$$anonfun$failAfter$1.apply(Timeouts.scala:250) [info] at org.scalatest.concurrent.Timeouts$$anonfun$failAfter$1.apply(Timeouts.scala:250) [info] at org.scalatest.concurrent.Timeouts$class.timeoutAfter(Timeouts.scala:282) [info] at org.scalatest.concurrent.Timeouts$class.failAfter(Timeouts.scala:246) [info] at org.apache.spark.bagel.BagelSuite.failAfter(BagelSuite.scala:32) [info] at org.apache.spark.bagel.BagelSuite$$anonfun$3.apply$mcV$sp(BagelSuite.scala:85) [info] at org.apache.spark.bagel.BagelSuite$$anonfun$3.apply(BagelSuite.scala:85) [info] at org.apache.spark.bagel.BagelSuite$$anonfun$3.apply(BagelSuite.scala:85) [info] at org.scalatest.FunSuite$$anon$1.apply(FunSuite.scala:1265) [info] at org.scalatest.Suite$class.withFixture(Suite.scala:1974) [info] at org.apache.spark.bagel.BagelSuite.withFixture(BagelSuite.scala:32)
Re: Understanding epsilon in KMeans
It is running k-means many times, independently, from different random starting points in order to pick the best clustering. Convergence ends one run, not all of them. Yes epsilon should be the same as convergence threshold elsewhere. You can set epsilon if you instantiate KMeans directly. Maybe it would be nice to overload train() to be able to set that too, but I imagine the point of the static convenience methods is to encapsulate the most usual subsets of parameters. On Wed, May 14, 2014 at 1:50 PM, Stuti Awasthi stutiawas...@hcl.com wrote: Hi All, I wanted to understand the functionality of epsilon in KMeans in Spark MLlib. As per documentation : distance threshold within which we've consider centers to have converged.If all centers move less than this Euclidean distance, we stop iterating one run. Now I have assumed that if centers are moving less than epsilon value then Clustering Stops but then what does it mean by “we stop iterating one run”.. Now suppose I have given maxIterations=10 and epsilon = 0.1 and assume that centers are afteronly 2 iteration, the epsilon condition is met i.e. now centers are moving only less than 0.1.. Now what happens ?? The whole 10 iterations are completed OR the Clustering stops ?? My 2nd query is in Mahout, there is a configuration param : “Convergence Threshold (cd)” which states : “in an iteration, the centroids don’t move more than this distance, no further iterations are done and clustering stops.” So is epsilon and cd similar ?? 3rd query : How to pass epsilon as a configurable param. KMeans.train() does not provide the way but in code I can see “setEpsilon” as method. SO if I want to pass the parameter as epsilon=0.1 , how may I do that.. Pardon my ignorance Thanks Stuti Awasthi ::DISCLAIMER:: The contents of this e-mail and any attachment(s) are confidential and intended for the named recipient(s) only. E-mail transmission is not guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or may contain viruses in transmission. The e mail and its contents (with or without referred errors) shall therefore not attach any liability on the originator or HCL or its affiliates. Views or opinions, if any, presented in this email are solely those of the author and may not necessarily reflect the views or opinions of HCL or its affiliates. Any form of reproduction, dissemination, copying, disclosure, modification, distribution and / or publication of this message without the prior written consent of authorized representative of HCL is strictly prohibited. If you have received this email in error please delete it and notify the sender immediately. Before opening any email and/or attachments, please check them for viruses and other defects.
Re: Reading from .bz2 files with Spark
Hi Andrew, Could you try varying the minPartitions parameter? For example: val r = sc.textFile(/user/aa/myfile.bz2, 4).count val r = sc.textFile(/user/aa/myfile.bz2, 8).count Best, Xiangrui On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote: Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes the problem you described, but it does contain several fixes to bzip2 format. -Xiangrui On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote: Hi all, Is anyone reading and writing to .bz2 files stored in HDFS from Spark with success? I'm finding the following results on a recent commit (756c96 from 24hr ago) and CDH 4.4.0: Works: val r = sc.textFile(/user/aa/myfile.bz2).count Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) = s+| ).count Specifically, I'm getting an exception coming out of the bzip2 libraries (see below stacktraces), which is unusual because I'm able to read from that file without an issue using the same libraries via Pig. It was originally created from Pig as well. Digging a little deeper I found this line in the .bz2 decompressor's javadoc for CBZip2InputStream: Instances of this class are not threadsafe. [source] My current working theory is that Spark has a much higher level of parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds exceptions much more frequently (as in can't finish a run over a little 2M row file) vs hardly at all in other libraries. The only other reference I could find to the issue was in presto-users, but the recommendation to leave .bz2 for .lzo doesn't help if I actually do want the higher compression levels of .bz2. Would love to hear if I have some kind of configuration issue or if there's a bug in .bz2 that's fixed in later versions of CDH, or generally any other thoughts on the issue. Thanks! Andrew Below are examples of some exceptions I'm getting: 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 65535 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) java.lang.ArrayIndexOutOfBoundsException: 90 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at
Re: Express VMs - good idea?
Hey Marco, I tried the CDH5 VM today and it works fine -- but note that you need to start the Spark service after the VM boots. Just go to CM and choose Start from the dropdown next to Spark. spark-shell works fine then. On Wed, May 14, 2014 at 1:00 PM, Marco Shaw marco.s...@gmail.com wrote: Hi, I've wanted to play with Spark. I wanted to fast track things and just use one of the vendor's express VMs. I've tried Cloudera CDH 5.0 and Hortonworks HDP 2.1. I've not written down all of my issues, but for certain, when I try to run spark-shell it doesn't work. Cloudera seems to crash, and both complain when I try to use SparkContext in a simple Scala command. So, just a basic question on whether anyone has had success getting these express VMs to work properly with Spark *out of the box* (HDP does required you install Spark manually). I know Cloudera recommends 8GB of RAM, but I've been running it with 4GB. Could it be that 4GB is just not enough, and causing issues or have others had success using these Hadoop 2.x pre-built VMs with Spark 0.9.x? Marco
Re: java serialization errors with spark.files.userClassPathFirst=true
after removing all class paramater of class Path from my code, i tried again. different but related eror when i set spark.files.userClassPathFirst=true now i dont even use FileInputFormat directly. HadoopRDD does... 14/05/16 12:17:17 ERROR Executor: Exception in task ID 45 java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:792) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at org.apache.spark.executor.ChildExecutorURLClassLoader$userClassLoader$.findClass(ExecutorURLClassLoader.scala:42) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.executor.ChildExecutorURLClassLoader.findClass(ExecutorURLClassLoader.scala:51) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:57) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1481) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1331) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) On Thu, May 15, 2014 at 3:03 PM, Koert Kuipers ko...@tresata.com wrote: when i set spark.files.userClassPathFirst=true, i get java serialization errors in my tasks, see below. when i set userClassPathFirst back to its default of false, the serialization errors are gone. my spark.serializer is KryoSerializer. the class org.apache.hadoop.fs.Path is in the spark assembly jar, but not in my task jars (the ones i added to the SparkConf). so looks like the ClosureSerializer is having trouble with this class once the ChildExecutorURLClassLoader is used? thats me just guessing. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:5 failed 4 times, most recent failure: Exception failure in TID 31 on host node05.tresata.com: java.lang.NoClassDefFoundError: org/apache/hadoop/fs/Path java.lang.Class.getDeclaredConstructors0(Native Method) java.lang.Class.privateGetDeclaredConstructors(Class.java:2398) java.lang.Class.getDeclaredConstructors(Class.java:1838) java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1697) java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:50) java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:203) java.security.AccessController.doPrivileged(Native Method) java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:200) java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:556)
spark-submit / S3
Hi I see from the docs for 1.0.0 that the new spark-submit mechanism seems to support specifying the jar with hdfs:// or http:// Does this support S3? (It doesn't seem to as I have tried it on EC2 but doesn't seem to work): ./bin/spark-submit --master local[2] --class myclass s3n://bucket/myapp.jar args
Counting things only once
I want to use accumulators to keep counts of things like invalid lines found and such, for reporting purposes. Similar to Hadoop counters. This may seem simple, but my case is a bit more complicated. The code which is creating an RDD from a transform is separated from the code which performs the operation on that RDD - or operations (I can't make any assumption as to how many operations will be done on this RDD). There are two issues: (1) I want to retrieve the accumulator value only after it has been computed, and (2) I don't wan to count the same thing twice if the RDD is recomputed. Here's a simple example, converting strings to integers. Any records which can't be parsed as an integer are dropped, but I want to count how many times that happens: def numbers(val input: RDD[String]) : RDD[Int] = { val invalidRecords = sc.accumulator(0) input.flatMap { record = try { Seq(record.toInt) } catch { case NumberFormatException = invalidRecords += 1; Seq() } } } I need some way to know when the result RDD has been computed so I can get the accumulator value and reset it. Or perhaps it would be better to say I need a way to ensure the accumulator value is computed exactly once for a given RDD. Anyone know a way to do this? Or anything I might look into? Or is this something that just isn't supported in Spark? -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io http://www.velos.io
Re: Hadoop 2.3 Centralized Cache vs RDD
http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html We do not currently cache blocks which are under construction, corrupt, or otherwise incomplete. Have you tried with a file with more than 1 block? And dfs.namenode.path.based.cache.refresh.interval.ms might be too large? You might want to ask a broader mailing list. This is not related to Spark. Bertrand On Fri, May 16, 2014 at 2:56 AM, hequn cheng chenghe...@gmail.com wrote: I tried centralized cache step by step following the apache hadoop oficial website, but it seems centralized cache doesn't work. see : http://stackoverflow.com/questions/22293358/centralized-cache-failed-in-hadoop-2-3 . Can anyone succeed? 2014-05-15 5:30 GMT+08:00 William Kang weliam.cl...@gmail.com: Hi, Any comments or thoughts on the implications of the newly released feature from Hadoop 2.3 on the centralized cache? How different it is from RDD? Many thanks. Cao
Re: accessing partition i+1 from mapper of partition i
I don't think there's a direct way of bleeding elements across partitions. But you could write it yourself relatively succinctly: A) Sort the RDD B) Look at the sorted RDD's partitions with the .mapParititionsWithIndex( ) method. Map each partition to its partition ID, and its maximum element. Collect the (partID, maxElements) in the driver. C) Broadcast the collection of (partID, part's max element) tuples D) Look again at the sorted RDD's partitions with mapPartitionsWithIndex( ). For each partition *K:* D1) Find the immediately-preceding partition* K -1 , *and its associated maximum value. Use that to decide how many values are missing between the last element of part *K-1 *and the first element of part *K*. D2) Step through part *K*'s elements and find the rest of the missing elements in that part This approach sidesteps worries you might have over the hack of using .filter to remove the first element, as well as the zipping. --Brian On Tue, May 13, 2014 at 9:31 PM, Mohit Jaggi mohitja...@gmail.com wrote: Hi, I am trying to find a way to fill in missing values in an RDD. The RDD is a sorted sequence. For example, (1, 2, 3, 5, 8, 11, ...) I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11) One way to do this is to slide and zip rdd1 = sc.parallelize(List(1, 2, 3, 5, 8, 11, ...)) x = rdd1.first rdd2 = rdd1 filter (_ != x) rdd3 = rdd2 zip rdd1 rdd4 = rdd3 flatmap { (x, y) = generate missing elements between x and y } Another method which I think is more efficient is to use mapParititions() on rdd1 to be able to iterate on elements of rdd1 in each partition. However, that leaves the boundaries of the partitions to be unfilled. *Is there a way within the function passed to mapPartitions, to read the first element in the next partition?* The latter approach also appears to work for a general sliding window calculation on the RDD. The former technique requires a lot of sliding and zipping and I believe it is not efficient. If only I could read the next partition...I have tried passing a pointer to rdd1 to the function passed to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because Spark cannot deal with a mapper calling another mapper (since it happens on a worker not the driver) Mohit.
Re: How to read a multipart s3 file?
On Wed, May 7, 2014 at 4:44 PM, Aaron Davidson ilike...@gmail.com wrote: Spark can only run as many tasks as there are partitions, so if you don't have enough partitions, your cluster will be underutilized. This is a very important point. kamatsuoka, how many partitions does your RDD have when you try to save it? You can check this with myrdd._jrdd.splits().size() in PySpark. If it’s less than the number of cores in your cluster, try repartition()-ing the RDD as Aaron suggested. Nick
Variables outside of mapPartitions scope
I am working on some code which uses mapPartitions. Its working great, except when I attempt to use a variable within the function passed to mapPartitions which references something outside of the scope (for example, a variable declared immediately before the mapPartitions call). When this happens, I get a task not serializable error. I wanted to reference a variable which had been broadcasted, and ready to use within that closure. Seeing that, I attempted another solution, to store the broadcasted variable within an object (singleton class, thing). It serialized fine, but when I ran it on a cluster, any reference to it got a null pointer exception, my presumption is that the workers were not getting their objects updated for some reason, despite setting it as a broadcasted variable. My guess is that the workers get the serialized function, but spark doesn't know to serialize the object, including the things it reference. Thus the copied reference becomes invalid. What would be a good way to solve my problem? Is there a way to reference a broadcast variable by name rather through a variable? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Variables-outside-of-mapPartitions-scope-tp5517.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Doubts regarding Shark
Hello I have few questions regarding shark. 1) I have a table of 60 GB and i have total memory of 50 GB but when i try to cache the table it get cached successfully. How shark caches the table there was not enough memory to get the table in memory. And how cache eviction policies (FIFO and LRU) works while caching the table. While creating tables I am using cache type property as MEMORY (storage level: memory and disk) 2) Sometime while running queries I get JavaOutOfMemory Exception but all tables are cached successfully. Can you tell me the cases or some example due to which that error can come. Regards Vinay Bajaj
Re: KryoSerializer Exception
UP, doesn't anyone know something about it? ^^ 2014-05-06 12:05 GMT+02:00 Andrea Esposito and1...@gmail.com: Hi there, sorry if i'm posting a lot lately. i'm trying to add the KryoSerializer but i receive this exception: 2014 - 05 - 06 11: 45: 23 WARN TaskSetManager: 62 - Loss was due to java.io.EOFException java.io.EOFException at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala: 105) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala: 165) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala: 56) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java: 43) at java.lang.reflect.Method.invoke(Method.java: 606) I set the serializer as: System.setProperty(spark.serializer, org.apache.spark.serializer.KryoSerializer) System.setProperty(spark.kryo.registrator, test.TestKryoRegistrator) With or without register my custom registrator it throws the exception. Seems something related to broadcast.. but isn't Kryo already ok out of the box just setting it as default serializer?
Re: Distribute jar dependencies via sc.AddJar(fileName)
Hi guys, I think it maybe a bug in Spark. I wrote some code to demonstrate the bug. Example 1) This is how Spark adds jars. Basically, add jars to cutomURLClassLoader. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java It doesn't work for two reasons. a) We don't pass the customURLClassLoader to task, so it's only available in the Executor.scala. b) Even we do so, we need to get the class by loader.loadClass(Class Name).newInstance(), and get the Method by getDeclaredMethod to run it. Example 2) It works by getting the class using loadClass API, and then get and run the Method by getDeclaredMethod. Since we don't know which classes users will use, it's not a solution. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java Example 3) Add jars to systemClassLoader and have them accessible in JVM. Users can use the classes directly. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java I'm now porting example 3) to Spark, and will let you know if it works. Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Thu, May 15, 2014 at 12:03 PM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, We're still using Spark 0.9 branch, and our job is submitted by ./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar YOUR_APP_JAR_FILE \ --class APP_MAIN_CLASS \ --args APP_MAIN_ARGUMENTS \ --num-workers NUMBER_OF_WORKER_MACHINES \ --master-class ApplicationMaster_CLASS --master-memory MEMORY_FOR_MASTER \ --worker-memory MEMORY_PER_WORKER \ --addJars any_local_files_used_in_SparkContext.addJar Based on my understanding of the code in yarn-standalone mode, the jar distributing from local machine to application master is through distributed cache (using hadoop yarn-client api). From application master to executors, it's through http server. I maybe wrong, but if you look at the code in SparkContext addJar method, you can see the jar is added to http server in yarn-standalone mode. if (SparkHadoopUtil.get.isYarnMode() master == yarn-standalone) { // In order for this to work in yarn standalone mode the user must specify the // --addjars option to the client to upload the file into the distributed cache // of the AM to make it show up in the current working directory. val fileName = new Path(uri.getPath).getName() try { env.httpFileServer.addJar(new File(fileName)) } catch { Those jars will be fetched in Executor from http server and added to classloader of Executor class, see private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { synchronized { // Fetch missing dependencies for ((name, timestamp) - newFiles if currentFiles.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentFiles(name) = timestamp } for ((name, timestamp) - newJars if currentJars.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentJars(name) = timestamp // Add it to our class loader val localName = name.split(/).last val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL if (!urlClassLoader.getURLs.contains(url)) { urlClassLoader.addURL(url) } } The problem seems to be that jars are added to the classloader of Executor classes, and they are not accessible in Task.scala. I verified this by trying to load our custom classes in Executor.scala, and it works. But if I tried to load those classes in Task.scala, I'll get classNotFound exception. Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng men...@gmail.com wrote: In SparkContext#addJar, for yarn-standalone mode, the workers should get the jars from local distributed cache instead of fetching them from the http server. Could you send the command you used to submit the job? -Xiangrui On Wed, May 14, 2014 at 1:26 AM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, I actually used `yarn-standalone`, sorry for misleading. I did debugging in the last couple days, and everything up to updateDependency in executor.scala works. I also checked the file size and md5sum in the executors, and they are the same as the one in driver. Gonna do more testing
writing my own RDD
in writing my own RDD i ran into a few issues with respect to stuff being private in spark. in compute i would like to return an iterator that respects task killing (as HadoopRDD does), but the mechanics for that are inside the private InterruptibleIterator. also the exception i am supposed to throw (TaskKilledException) is private to spark.
Re: How to run the SVM and LogisticRegression
There are examples to run them in BinaryClassification.scala in org.apache.spark.examples... On Wed, May 14, 2014 at 1:36 PM, yxzhao yxz...@ualr.edu wrote: Hello, I found the classfication algorithms SVM and LogisticRegression implemented in the following directory. And how to run them? What is the commnad line should be? Thanks. spark-0.9.0-incubating/mllib/src/main/scala/org/apache/spark/mllib/classification -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-run-the-SVM-and-LogisticRegression-tp5720.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Passing runtime config to workers?
What is a good way to pass config variables to workers? I've tried setting them in environment variables via spark-env.sh, but, as far as I can tell, the environment variables set there don't appear in workers' environments. If I want to be able to configure all workers, what's a good way to do it? For example, I want to tell all workers: USE_ALGO_A or USE_ALGO_B - but I don't want to recompile.
Re: Distribute jar dependencies via sc.AddJar(fileName)
I've experienced the same bug, which I had to workaround manually. I posted the details here: http://stackoverflow.com/questions/23687081/spark-workers-unable-to-find-jar-on-ec2-cluster On 5/15/14, DB Tsai dbt...@stanford.edu wrote: Hi guys, I think it maybe a bug in Spark. I wrote some code to demonstrate the bug. Example 1) This is how Spark adds jars. Basically, add jars to cutomURLClassLoader. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java It doesn't work for two reasons. a) We don't pass the customURLClassLoader to task, so it's only available in the Executor.scala. b) Even we do so, we need to get the class by loader.loadClass(Class Name).newInstance(), and get the Method by getDeclaredMethod to run it. Example 2) It works by getting the class using loadClass API, and then get and run the Method by getDeclaredMethod. Since we don't know which classes users will use, it's not a solution. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java Example 3) Add jars to systemClassLoader and have them accessible in JVM. Users can use the classes directly. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java I'm now porting example 3) to Spark, and will let you know if it works. Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Thu, May 15, 2014 at 12:03 PM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, We're still using Spark 0.9 branch, and our job is submitted by ./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar YOUR_APP_JAR_FILE \ --class APP_MAIN_CLASS \ --args APP_MAIN_ARGUMENTS \ --num-workers NUMBER_OF_WORKER_MACHINES \ --master-class ApplicationMaster_CLASS --master-memory MEMORY_FOR_MASTER \ --worker-memory MEMORY_PER_WORKER \ --addJars any_local_files_used_in_SparkContext.addJar Based on my understanding of the code in yarn-standalone mode, the jar distributing from local machine to application master is through distributed cache (using hadoop yarn-client api). From application master to executors, it's through http server. I maybe wrong, but if you look at the code in SparkContext addJar method, you can see the jar is added to http server in yarn-standalone mode. if (SparkHadoopUtil.get.isYarnMode() master == yarn-standalone) { // In order for this to work in yarn standalone mode the user must specify the // --addjars option to the client to upload the file into the distributed cache // of the AM to make it show up in the current working directory. val fileName = new Path(uri.getPath).getName() try { env.httpFileServer.addJar(new File(fileName)) } catch { Those jars will be fetched in Executor from http server and added to classloader of Executor class, see private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { synchronized { // Fetch missing dependencies for ((name, timestamp) - newFiles if currentFiles.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentFiles(name) = timestamp } for ((name, timestamp) - newJars if currentJars.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentJars(name) = timestamp // Add it to our class loader val localName = name.split(/).last val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL if (!urlClassLoader.getURLs.contains(url)) { urlClassLoader.addURL(url) } } The problem seems to be that jars are added to the classloader of Executor classes, and they are not accessible in Task.scala. I verified this by trying to load our custom classes in Executor.scala, and it works. But if I tried to load those classes in Task.scala, I'll get classNotFound exception. Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng men...@gmail.com wrote: In SparkContext#addJar, for yarn-standalone mode, the workers should get the jars from local distributed cache instead of fetching them from the http server. Could you send the command you used to submit the job? -Xiangrui On Wed, May 14, 2014 at 1:26 AM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, I actually used
What is the difference between a Spark Worker and a Spark Slave?
What is the difference between a Spark Worker and a Spark Slave?
Nested method in a class: Task not serializable?
Hi! I understand the usual Task not serializable issue that arises when accessing a field or a method that is out of scope of a closure. To fix it, I usually define a local copy of these fields/methods, which avoids the need to serialize the whole class: class MyClass(val myField: Any) { def run() = { val f = sc.textFile(hdfs://xxx.xxx.xxx.xxx/file.csv) val myField = this.myField println(f.map( _ + myField ).count) } } === Now, if I define a nested function in the run method, it cannot be serialized: class MyClass() { def run() = { val f = sc.textFile(hdfs://xxx.xxx.xxx.xxx/file.csv) def mapFn(line: String) = line.split(;) val myField = this.myField println(f.map( mapFn( _ ) ).count) } } I don't understand since I thought mapFn would be in scope... Even stranger, if I define mapFn to be a val instead of a def, then it works: class MyClass() { def run() = { val f = sc.textFile(hdfs://xxx.xxx.xxx.xxx/file.csv) val mapFn = (line: String) = line.split(;) println(f.map( mapFn( _ ) ).count) } } Is this related to the way Scala represents nested functions? What's the recommended way to deal with this issue ? Thanks for your help, Pierre -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Nested-method-in-a-class-Task-not-serializable-tp5869.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Understanding epsilon in KMeans
Stuti, I'm answering your questions in order: 1. From MLLib https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L159 *,* you can see that clustering stops when we have reached*maxIterations* or there are no more*activeRuns*. KMeans is executed *runs* times in parallel, and the best clustering found over all *runs* is returned. For each run, the algorithm will stop if:The number of iteration reaches *maxIterations*, orEvery cluster center moved less than*epsilon *in the last iteration. 2. I can't find the source code for Mahout that refer to the Convergence Threshold but I suspect the threshold and MLLib's *epsilon*are the same concepts. There is no concept of parallel runs in Mahout. Ref: https://mahout.apache.org/users/clustering/k-means-clustering.html 3. To set MLLib's KMeans to have *epsilon *of 0.1 and then train the model, you can do the following: new KMeans().setK(k).setMaxIterations( maxIterations).setRuns(runs).setInitializationMode(initializationMode) *.setEpsilon(0.1)*.run(data) Enjoy, Long Pham Software Engineer at Adatao, Inc. longp...@adatao.com On May 15, 2014 7:29 PM, Stuti Awasthi stutiawas...@hcl.com wrote: Hi All, Any ideas on this ?? Thanks Stuti Awasthi *From:* Stuti Awasthi *Sent:* Wednesday, May 14, 2014 6:20 PM *To:* user@spark.apache.org *Subject:* Understanding epsilon in KMeans Hi All, I wanted to understand the functionality of epsilon in KMeans in Spark MLlib. As per documentation : distance threshold within which we've consider centers to have converged.If all centers move less than this *Euclidean* distance, we stop iterating one run. Now I have assumed that if centers are moving less than epsilon value then Clustering Stops but then what does it mean by “we stop iterating one run”.. Now suppose I have given maxIterations=10 and epsilon = 0.1 and assume that centers are afteronly 2 iteration, the epsilon condition is met i.e. now centers are moving only less than 0.1.. Now what happens ?? The whole 10 iterations are completed OR the Clustering stops ?? My 2nd query is in Mahout, there is a configuration param : “Convergence Threshold (cd)” which states : “in an iteration, the centroids don’t move more than this distance, no further iterations are done and clustering stops.” So is epsilon and cd similar ?? 3rd query : How to pass epsilon as a configurable param. KMeans.train() does not provide the way but in code I can see “setEpsilon” as method. SO if I want to pass the parameter as epsilon=0.1 , how may I do that.. Pardon my ignorance Thanks Stuti Awasthi ::DISCLAIMER:: The contents of this e-mail and any attachment(s) are confidential and intended for the named recipient(s) only. E-mail transmission is not guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or may contain viruses in transmission. The e mail and its contents (with or without referred errors) shall therefore not attach any liability on the originator or HCL or its affiliates. Views or opinions, if any, presented in this email are solely those of the author and may not necessarily reflect the views or opinions of HCL or its affiliates. Any form of reproduction, dissemination, copying, disclosure, modification, distribution and / or publication of this message without the prior written consent of authorized representative of HCL is strictly prohibited. If you have received this email in error please delete it and notify the sender immediately. Before opening any email and/or attachments, please check them for viruses and other defects.
Re: cant get tests to pass anymore on master master
yeah sure. it is ubuntu 12.04 with jdk1.7.0_40 what else is relevant that i can provide? On Thu, May 15, 2014 at 12:17 PM, Sean Owen so...@cloudera.com wrote: FWIW I see no failures. Maybe you can say more about your environment, etc. On Wed, May 7, 2014 at 10:01 PM, Koert Kuipers ko...@tresata.com wrote: i used to be able to get all tests to pass. with java 6 and sbt i get PermGen errors (no matter how high i make the PermGen). so i have given up on that. with java 7 i see 1 error in a bagel test and a few in streaming tests. any ideas? see the error in BagelSuite below. [info] - large number of iterations *** FAILED *** (10 seconds, 105 milliseconds) [info] The code passed to failAfter did not complete within 10 seconds. (BagelSuite.scala:85) [info] org.scalatest.exceptions.TestFailedDueToTimeoutException: [info] at org.scalatest.concurrent.Timeouts$$anonfun$failAfter$1.apply(Timeouts.scala:250) [info] at org.scalatest.concurrent.Timeouts$$anonfun$failAfter$1.apply(Timeouts.scala:250) [info] at org.scalatest.concurrent.Timeouts$class.timeoutAfter(Timeouts.scala:282) [info] at org.scalatest.concurrent.Timeouts$class.failAfter(Timeouts.scala:246) [info] at org.apache.spark.bagel.BagelSuite.failAfter(BagelSuite.scala:32) [info] at org.apache.spark.bagel.BagelSuite$$anonfun$3.apply$mcV$sp(BagelSuite.scala:85) [info] at org.apache.spark.bagel.BagelSuite$$anonfun$3.apply(BagelSuite.scala:85) [info] at org.apache.spark.bagel.BagelSuite$$anonfun$3.apply(BagelSuite.scala:85) [info] at org.scalatest.FunSuite$$anon$1.apply(FunSuite.scala:1265) [info] at org.scalatest.Suite$class.withFixture(Suite.scala:1974) [info] at org.apache.spark.bagel.BagelSuite.withFixture(BagelSuite.scala:32)
unsubscribe
Re: Hadoop 2.3 Centralized Cache vs RDD
I tried centralized cache step by step following the apache hadoop oficial website, but it seems centralized cache doesn't work. see : http://stackoverflow.com/questions/22293358/centralized-cache-failed-in-hadoop-2-3 . Can anyone succeed? 2014-05-15 5:30 GMT+08:00 William Kang weliam.cl...@gmail.com: Hi, Any comments or thoughts on the implications of the newly released feature from Hadoop 2.3 on the centralized cache? How different it is from RDD? Many thanks. Cao
Re: java serialization errors with spark.files.userClassPathFirst=true
well, i modified ChildExecutorURLClassLoader to also delegate to parentClassloader if NoClassDefFoundError is thrown... now i get yet another error. i am clearly missing something with these classloaders. such nasty stuff... giving up for now. just going to have to not use spark.files.userClassPathFirst=true for now, until i have more time to look at this. 14/05/16 13:58:59 ERROR Executor: Exception in task ID 3 java.lang.ClassCastException: cannot assign instance of scala.None$ to field org.apache.spark.rdd.RDD.checkpointData of type scala.Option in instance of MyRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1995) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:60) On Fri, May 16, 2014 at 1:46 PM, Koert Kuipers ko...@tresata.com wrote: after removing all class paramater of class Path from my code, i tried again. different but related eror when i set spark.files.userClassPathFirst=true now i dont even use FileInputFormat directly. HadoopRDD does... 14/05/16 12:17:17 ERROR Executor: Exception in task ID 45 java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:792) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at org.apache.spark.executor.ChildExecutorURLClassLoader$userClassLoader$.findClass(ExecutorURLClassLoader.scala:42) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.executor.ChildExecutorURLClassLoader.findClass(ExecutorURLClassLoader.scala:51) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:57) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1481) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1331) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
Re: different in spark on yarn mode and standalone mode
And I thought I sent it to the right list! Here you go again - Question below : On May 14, 2014, at 3:06 PM, Vipul Pandey vipan...@gmail.com wrote: So here's a followup question : What's the preferred mode? We have a new cluster coming up with petabytes of data and we intend to take Spark to production. We are trying to figure out what mode would be safe and stable for production like environment. pros and cons? anyone? Any reasons why one would chose Standalone over YARN? Thanks, Vipul On May 4, 2014, at 5:56 PM, Liu, Raymond raymond@intel.com wrote: In the core, they are not quite different In standalone mode, you have spark master and spark worker who allocate driver and executors for your spark app. While in Yarn mode, Yarn resource manager and node manager do this work. When the driver and executors have been launched, the rest part of resource scheduling go through the same process, say between driver and executor through akka actor. Best Regards, Raymond Liu -Original Message- From: Sophia [mailto:sln-1...@163.com] Hey you guys, What is the different in spark on yarn mode and standalone mode about resource schedule? Wish you happy everyday. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/different-in-spark-on-yarn-mode-and-standalone-mode-tp5300.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Real world
http://spark-summit.org ? Bertrand On Thu, May 8, 2014 at 2:05 AM, Ian Ferreira ianferre...@hotmail.comwrote: Folks, I keep getting questioned on real world experience of Spark as in mission critical production deployments. Does anyone have some war stories to share or know of resources to review? Cheers - Ian
Re: Distribute jar dependencies via sc.AddJar(fileName)
The jars are actually there (and in classpath), but you need to load through reflection. I've another thread giving the workaround. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, May 16, 2014 at 1:37 PM, Robert James srobertja...@gmail.comwrote: I've experienced the same bug, which I had to workaround manually. I posted the details here: http://stackoverflow.com/questions/23687081/spark-workers-unable-to-find-jar-on-ec2-cluster On 5/15/14, DB Tsai dbt...@stanford.edu wrote: Hi guys, I think it maybe a bug in Spark. I wrote some code to demonstrate the bug. Example 1) This is how Spark adds jars. Basically, add jars to cutomURLClassLoader. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java It doesn't work for two reasons. a) We don't pass the customURLClassLoader to task, so it's only available in the Executor.scala. b) Even we do so, we need to get the class by loader.loadClass(Class Name).newInstance(), and get the Method by getDeclaredMethod to run it. Example 2) It works by getting the class using loadClass API, and then get and run the Method by getDeclaredMethod. Since we don't know which classes users will use, it's not a solution. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java Example 3) Add jars to systemClassLoader and have them accessible in JVM. Users can use the classes directly. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java I'm now porting example 3) to Spark, and will let you know if it works. Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Thu, May 15, 2014 at 12:03 PM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, We're still using Spark 0.9 branch, and our job is submitted by ./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar YOUR_APP_JAR_FILE \ --class APP_MAIN_CLASS \ --args APP_MAIN_ARGUMENTS \ --num-workers NUMBER_OF_WORKER_MACHINES \ --master-class ApplicationMaster_CLASS --master-memory MEMORY_FOR_MASTER \ --worker-memory MEMORY_PER_WORKER \ --addJars any_local_files_used_in_SparkContext.addJar Based on my understanding of the code in yarn-standalone mode, the jar distributing from local machine to application master is through distributed cache (using hadoop yarn-client api). From application master to executors, it's through http server. I maybe wrong, but if you look at the code in SparkContext addJar method, you can see the jar is added to http server in yarn-standalone mode. if (SparkHadoopUtil.get.isYarnMode() master == yarn-standalone) { // In order for this to work in yarn standalone mode the user must specify the // --addjars option to the client to upload the file into the distributed cache // of the AM to make it show up in the current working directory. val fileName = new Path(uri.getPath).getName() try { env.httpFileServer.addJar(new File(fileName)) } catch { Those jars will be fetched in Executor from http server and added to classloader of Executor class, see private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { synchronized { // Fetch missing dependencies for ((name, timestamp) - newFiles if currentFiles.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentFiles(name) = timestamp } for ((name, timestamp) - newJars if currentJars.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentJars(name) = timestamp // Add it to our class loader val localName = name.split(/).last val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL if (!urlClassLoader.getURLs.contains(url)) { urlClassLoader.addURL(url) } } The problem seems to be that jars are added to the classloader of Executor classes, and they are not accessible in Task.scala. I verified this by trying to load our custom classes in Executor.scala, and it works. But if I tried to load those classes in Task.scala, I'll get classNotFound exception. Thanks. Sincerely, DB Tsai
Re: Spark unit testing best practices
Thanks for the answers! On a concrete example, here is what I did to test my (wrong :) ) hypothesis before writing my email: class SomethingNotSerializable { def process(a: Int): Int = 2 *a } object NonSerializableClosure extends App { val sc = new spark.SparkContext( local, SerTest, /home/xandrew/spark-0.9.0-incubating, Seq(target/scala-2.10/sparktests_2.10-0.1-SNAPSHOT.jar)) val sns = new SomethingNotSerializable println(sc.parallelize(Seq(1,2,3)) .map(sns.process(_)) .reduce(_ + _)) } This program prints 12 correctly. If I change local to point to my spark master the code fails on the worker with a NullPointerException in the line .map(sns.process(_)). But I have to say that my original assumption that this is a serialization issue was wrong, as adding extends Serializable to my class does _not_ solve the problem in non-local mode. This seems to be something more convoluted, the sns reference in my closure is probably not stored by value, instead I guess it's a by name reference to NonSerializableClosure.sns. I'm a bit surprised why this results in a NullPointerException instead of some error when trying to run the constructor of this object on the worker. Maybe something to do with the magic of App. Anyways, while this is indeed an example of an error that doesn't manifest in local mode, I guess it turns out to be convoluted enough that we won't worry about it for now, use local in tests, and I'll ask again if we see some actual prod vs unittest problems. On using local-cluster, this does sound like exactly what I had in mind. But it doesn't seem to work for application developers. It seems to assume you are running within a spark build (it fails while looking for the file bin/compute-classpath.sh). So maybe that's a reason it's not documented... Cheers, Andras On Wed, May 14, 2014 at 7:58 PM, Mark Hamstra m...@clearstorydata.comwrote: Local mode does serDe, so it should expose serialization problems. On Wed, May 14, 2014 at 10:53 AM, Philip Ogren philip.og...@oracle.comwrote: Have you actually found this to be true? I have found Spark local mode to be quite good about blowing up if there is something non-serializable and so my unit tests have been great for detecting this. I have never seen something that worked in local mode that didn't work on the cluster because of different serialization requirements between the two. Perhaps it is different when using Kryo On 05/14/2014 04:34 AM, Andras Nemeth wrote: E.g. if I accidentally use a closure which has something non-serializable in it, then my test will happily succeed in local mode but go down in flames on a real cluster.
What does Spark cache() actually do?
Hi there, I was wondering if some one could explain me how the cache() function works in Spark in these phases: (1) If I have a huge file, say 1TB, which cannot be entirely stored in Memory. What will happen if I try to create a RDD of this huge file and cache? (2) If it works in Spark, it can definitely store part of the data. Which part of the data will be stored in memory, especially, do the new data evict the old data out of memory just like what cache works? (3) What would happen if I try to load one RDD and cache, and then another and cache too, and so on so forth? Will the new RDDs evict the old RDDs cached in memory? Thanks very much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-does-Spark-cache-actually-do-tp5778.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: slf4j and log4j loop
Please ignore. This was sent last week not sure why it arrived so late. -Original Message- From: amoc [mailto:amoc...@verticalscope.com] Sent: May-09-14 10:13 AM To: u...@spark.incubator.apache.org Subject: Re: slf4j and log4j loop Hi Patrick/Sean, Sorry to resurrect this thread, but after upgrading to Spark 9.1 I still get this error on runtime. ..trying to run some tests here. Has this actually been integrated int Spark 9.1? Thanks again -A -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/slf4j-and-log4j-loop-tp2699p5524.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: different in spark on yarn mode and standalone mode
Thanks for responding, Sandy. YARN for sure is a more mature way of working on shared resources. I was not sure about how stable Spark on YARN is and if anyone is using it in production. I have been using Standalone mode in our dev cluster but multi-tenancy and resource allocation wise it's difficult to call it production ready yet. (I'm not sure if 1.0 has significant changes or not as I haven't kept up lately) What I get from your response below is that for production like environment YARN will be a better choice as, for our case, we don't care too much about saving a few seconds in startup time. Stability will definitely be a concern but Im assuming that Spark on Yarn is not terrible either and will mature over the period of time, in which case we don't have to compromise on other important factors (like resource sharing and prioritization) btw, can I see information on what RDDs are cached and their size etc. on YARN? like I see in the standalone mode UI? ~Vipul On May 15, 2014, at 5:24 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Vipul, Some advantages of using YARN: * YARN allows you to dynamically share and centrally configure the same pool of cluster resources between all frameworks that run on YARN. You can throw your entire cluster at a MapReduce job, then use some of it on an Impala query and the rest on Spark application, without any changes in configuration. * You can take advantage of all the features of YARN schedulers for categorizing, isolating, and prioritizing workloads. * YARN provides CPU-isolation between processes with CGroups. Spark standalone mode requires each application to run an executor on every node in the cluster - with YARN, you choose the number of executors to use. * YARN is the only cluster manager for Spark that supports security and Kerberized clusters. Some advantages of using standalone: * It has been around for longer, so it is likely a little more stable. * Many report faster startup times for apps. -Sandy On Wed, May 14, 2014 at 3:06 PM, Vipul Pandey vipan...@gmail.com wrote: So here's a followup question : What's the preferred mode? We have a new cluster coming up with petabytes of data and we intend to take Spark to production. We are trying to figure out what mode would be safe and stable for production like environment. pros and cons? anyone? Any reasons why one would chose Standalone over YARN? Thanks, Vipul On May 4, 2014, at 5:56 PM, Liu, Raymond raymond@intel.com wrote: In the core, they are not quite different In standalone mode, you have spark master and spark worker who allocate driver and executors for your spark app. While in Yarn mode, Yarn resource manager and node manager do this work. When the driver and executors have been launched, the rest part of resource scheduling go through the same process, say between driver and executor through akka actor. Best Regards, Raymond Liu -Original Message- From: Sophia [mailto:sln-1...@163.com] Hey you guys, What is the different in spark on yarn mode and standalone mode about resource schedule? Wish you happy everyday. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/different-in-spark-on-yarn-mode-and-standalone-mode-tp5300.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Schema view of HadoopRDD
I guess what you are trying to do is get a columnar projection on your data, sparksql maybe a good option for you (especially if your data is sparse good for columnar projection). If you are looking to work with simple key value then you are better off using Hbase input reader in hadoopIO get a pairRDD. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, May 8, 2014 at 10:51 AM, Debasish Das debasish.da...@gmail.comwrote: Hi, For each line that we read as textLine from HDFS, we have a schema..if there is an API that takes the schema as List[Symbol] and maps each token to the Symbol it will be helpful... One solution is to keep data on hdfs as avro/protobuf serialized objects but not sure if that works on HBase input...we are testing HDFS right now but finally we will read from a persistent store like hbase...so basically the immutableBytes need to be converted to a schema view as well incase we don't want to write the whole row as a protobuf... Does RDDs provide a schema view of the dataset on HDFS / HBase ? Thanks. Deb
Re: Schema view of HadoopRDD
Here is a link with more info: http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html On Wed, May 7, 2014 at 10:09 PM, Debasish Das debasish.da...@gmail.comwrote: Hi, For each line that we read as textLine from HDFS, we have a schema..if there is an API that takes the schema as List[Symbol] and maps each token to the Symbol it will be helpful... Does RDDs provide a schema view of the dataset on HDFS ? Thanks. Deb
Re: filling missing values in a sequence
Not sure if this is feasible, but this literally does what I think you are describing: sc.parallelize(rdd1.first to rdd1.last) On Tue, May 13, 2014 at 4:56 PM, Mohit Jaggi mohitja...@gmail.com wrote: Hi, I am trying to find a way to fill in missing values in an RDD. The RDD is a sorted sequence. For example, (1, 2, 3, 5, 8, 11, ...) I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11) One way to do this is to slide and zip rdd1 = sc.parallelize(List(1, 2, 3, 5, 8, 11, ...)) x = rdd1.first rdd2 = rdd1 filter (_ != x) rdd3 = rdd2 zip rdd1 rdd4 = rdd3 flatmap { (x, y) = generate missing elements between x and y } Another method which I think is more efficient is to use mapParititions() on rdd1 to be able to iterate on elements of rdd1 in each partition. However, that leaves the boundaries of the partitions to be unfilled. Is there a way within the function passed to mapPartitions, to read the first element in the next partition? The latter approach also appears to work for a general sliding window calculation on the RDD. The former technique requires a lot of sliding and zipping and I believe it is not efficient. If only I could read the next partition...I have tried passing a pointer to rdd1 to the function passed to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because Spark cannot deal with a mapper calling another mapper (since it happens on a worker not the driver) Mohit.
Advanced log processing
Hi, I have some complex behavior i'd like to be advised on as i'm really new to Spark. I'm reading some log files that contains various events. There are two types of events: parents and children. A child event can only have one parent and a parent can have multiple children. Currently i'm mapping my lines to get a Tuple2(parentID, Tuple2(Parent, ListChild)) and then reducing by key to combine all children into one list and associate them with their parent. .reduceByKey(new Function2Tuple2lt;Parent, Listlt;Child, Tuple2Parent, Listlt;Child, Tuple2Parent, Listlt;Child(){...}). It works fine on static data. But in production, i will have to process only part of the log files, for instance, everyday at midnight i'll process the last day of logs. So i'm facing the problem that a Parent may arrive one day and children on the next day. Right after reducing, i'm having Tuples with no parent and i'd like, only for those, to go check the previous log files to find the parent in a efficient way. My first idea would be to branch data using a filter and it's opposite. I'll then read previous files one by one until i've found all parents or i've reached a predefined limit. I would finally merge back everything to finalize my job. The problem is, i'm not even sure how i can do that. The filter part should be easy but how am i gonna scan files one by one using spark ? I hope someone can guide me through this. FYI, there will be gigs of data to process. Thanks Laurent -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Advanced-log-processing-tp5743.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: is Mesos falling out of favor?
Paco, that's a great video reference, thanks. To be fair to our friends at Yahoo, who have done a tremendous amount to help advance the cause of the BDAS stack, it's not FUD coming from them, certainly not in any organized or intentional manner. In vacuo we prefer Mesos ourselves, but also can't ignore the fact that in the larger market, many enterprise technology stack decisions are made based on their existing vendor support relationships. And in view of Mesos, super happy to see Mesosphere growing! Sent while mobile. Pls excuse typos etc. That's FUD. Tracking the Mesos and Spark use cases, there are very large production deployments of these together. Some are rather private but others are being surfaced. IMHO, one of the most amazing case studies is from Christina Delimitrou http://youtu.be/YpmElyi94AA For a tutorial, use the following but upgrade it to latest production for Spark. There was a related O'Reilly webcast and Strata tutorial as well: http://mesosphere.io/learn/run-spark-on-mesos/ FWIW, I teach Intro to Spark with sections on CM4, YARN, Mesos, etc. Based on lots of student experiences, Mesos is clearly the shortest path to deploying a Spark cluster if you want to leverage the robustness, multi-tenancy for mixed workloads, less ops overhead, etc., that show up repeatedly in the use case analyses. My opinion only and not that of any of my clients: Don't believe the FUD from YHOO unless you really want to be stuck in 2009. On Wed, May 7, 2014 at 8:30 AM, deric barton.to...@gmail.com wrote: I'm also using right now SPARK_EXECUTOR_URI, though I would prefer distributing Spark as a binary package. For running examples with `./bin/run-example ...` it works fine, however tasks from spark-shell are getting lost. Error: Could not find or load main class org.apache.spark.executor.MesosExecutorBackend which looks more like problem with sbin/spark-executor and missing paths to jar. Anyone encountered this error before? I guess Yahoo invested quite a lot of effort into YARN and Spark integration (moreover when Mahout is migrating to Spark there's much more interest in Hadoop and Spark integration). If there would be some Mesos company working on Spark - Mesos integration it could be at least on the same level. I don't see any other reason why would be YARN better than Mesos, personally I like the latter, however I haven't checked YARN for a while, maybe they've made a significant progress. I think Mesos is more universal and flexible than YARN. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/is-Mesos-falling-out-of-favor-tp5444p5481.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Using String Dataset for Logistic Regression
Pravesh, Correct, the logistic regression engine is set up to perform classification tasks that take feature vectors (arrays of real-valued numbers) that are given a class label, and learning a linear combination of those features that divide the classes. As the above commenters have mentioned, there's lots of different ways to turn string data into feature vectors. For instance, if you're classifying documents between, say, spam or valid email, you may want to start with a bag-of-words model (http://en.wikipedia.org/wiki/Bag-of-words_model ) or the rescaled variant TF-IDF ( http://en.wikipedia.org/wiki/Tf%E2%80%93idf ). You'd turn a single document into a single, high-dimensional, sparse vector whose element j encodes the number of appearance term j. Maybe you want to try the experiment by featurizing on bigrams, trigrams, etc... Or if you're just trying to tell english language tweets from non-english language tweets, in which case the bag of words might be overkill: you could instead try featurizing on just the counts of each pair of consecutive characters. E.g., the first element counts aa appearances, then the second ab, then zy then zz. Those will be smaller feature vectors, capturing less information, but it's probably sufficient for the simpler task, and you'll be able to fit the model with less data than trying to fit a whole-word-based model. Different applications are going to need more or less context from your strings -- whole words? n-grams? just characters? treat them as ENUMs as in the days of week example? -- so it might not make sense for Spark to come with a direct way to turn a string attribute into a vector for use in logistic regression. You'll have to settle on the featurization approach that's right for your domain before you try training the logistic regression classifier on your labelled feature vectors. Best, -Brian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-String-Dataset-for-Logistic-Regression-tp5523p5882.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Reading from .bz2 files with Spark
We never saw your exception when reading bzip2 files with spark. But when we wrongly compiled spark against older version of hadoop (was default in spark), we ended up with sequential reading of bzip2 file, not taking advantage of block splits to work in parallel. Once we compiled spark with SPARK_HADOOP_VERSION=2.2.0, files were read in parallel, as expected with a recent hadoop. http://spark.apache.org/docs/0.9.1/#a-note-about-hadoop-versions Make sure Spark is compiled against Hadoop v2 André On 2014-05-13 18:08, Xiangrui Meng wrote: Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes the problem you described, but it does contain several fixes to bzip2 format. -Xiangrui On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote: Hi all, Is anyone reading and writing to .bz2 files stored in HDFS from Spark with success? I'm finding the following results on a recent commit (756c96 from 24hr ago) and CDH 4.4.0: Works: val r = sc.textFile(/user/aa/myfile.bz2).count Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) = s+| ).count Specifically, I'm getting an exception coming out of the bzip2 libraries (see below stacktraces), which is unusual because I'm able to read from that file without an issue using the same libraries via Pig. It was originally created from Pig as well. Digging a little deeper I found this line in the .bz2 decompressor's javadoc for CBZip2InputStream: Instances of this class are not threadsafe. [source] My current working theory is that Spark has a much higher level of parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds exceptions much more frequently (as in can't finish a run over a little 2M row file) vs hardly at all in other libraries. The only other reference I could find to the issue was in presto-users, but the recommendation to leave .bz2 for .lzo doesn't help if I actually do want the higher compression levels of .bz2. Would love to hear if I have some kind of configuration issue or if there's a bug in .bz2 that's fixed in later versions of CDH, or generally any other thoughts on the issue. Thanks! Andrew Below are examples of some exceptions I'm getting: 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 65535 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) java.lang.ArrayIndexOutOfBoundsException: 90 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) at
Re: SparkContext startup time out
How did you deal with this problem, I have met with it these days.God bless me. Best regard, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-startup-time-out-tp1753p5738.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: problem with hdfs access in spark job
Hi Marcin, On Wed, May 14, 2014 at 7:22 AM, Marcin Cylke marcin.cy...@ext.allegro.pl wrote: - This looks like some problems with HA - but I've checked namenodes during the job was running, and there was no switch between master and slave namenode. 14/05/14 15:25:44 ERROR security.UserGroupInformation: PriviledgedActionException as:hc_client_reco_dev (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby 14/05/14 15:25:44 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby 14/05/14 15:25:44 ERROR security.UserGroupInformation: PriviledgedActionException as:hc_client_reco_dev (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby These are actually not worrisome; that's just the HDFS client doing its own thing to support HA. It probably picked the wrong NN to try first, and got the NN in standby exception, which it logs. Then it tries the other NN and things just work as expected. Business as usual. Not sure about the other exceptions you mention. I've seen the second one before, but it didn't seem to affect my jobs - maybe some race during cleanup. -- Marcelo
Re: Spark unit testing best practices
+1, at least with current code just watch the log printed by DAGScheduler… -- Nan Zhu On Wednesday, May 14, 2014 at 1:58 PM, Mark Hamstra wrote: serDe
Re: Reading from .bz2 files with Spark
Hi Xiangrui, // FYI I'm getting your emails late due to the Apache mailing list outage I'm using CDH4.4.0, which I think uses the MapReduce v2 API. The .jars are named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar I'm also glad you were able to reproduce! Please paste a link to the Hadoop bug you file so I can follow along. Thanks! Andrew On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote: Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes the problem you described, but it does contain several fixes to bzip2 format. -Xiangrui On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote: Hi all, Is anyone reading and writing to .bz2 files stored in HDFS from Spark with success? I'm finding the following results on a recent commit (756c96 from 24hr ago) and CDH 4.4.0: Works: val r = sc.textFile(/user/aa/myfile.bz2).count Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) = s+| ).count Specifically, I'm getting an exception coming out of the bzip2 libraries (see below stacktraces), which is unusual because I'm able to read from that file without an issue using the same libraries via Pig. It was originally created from Pig as well. Digging a little deeper I found this line in the .bz2 decompressor's javadoc for CBZip2InputStream: Instances of this class are not threadsafe. [source] My current working theory is that Spark has a much higher level of parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds exceptions much more frequently (as in can't finish a run over a little 2M row file) vs hardly at all in other libraries. The only other reference I could find to the issue was in presto-users, but the recommendation to leave .bz2 for .lzo doesn't help if I actually do want the higher compression levels of .bz2. Would love to hear if I have some kind of configuration issue or if there's a bug in .bz2 that's fixed in later versions of CDH, or generally any other thoughts on the issue. Thanks! Andrew Below are examples of some exceptions I'm getting: 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 65535 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) java.lang.ArrayIndexOutOfBoundsException: 90 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at
Re: different in spark on yarn mode and standalone mode
Hi Vipul, Some advantages of using YARN: * YARN allows you to dynamically share and centrally configure the same pool of cluster resources between all frameworks that run on YARN. You can throw your entire cluster at a MapReduce job, then use some of it on an Impala query and the rest on Spark application, without any changes in configuration. * You can take advantage of all the features of YARN schedulers for categorizing, isolating, and prioritizing workloads. * YARN provides CPU-isolation between processes with CGroups. Spark standalone mode requires each application to run an executor on every node in the cluster - with YARN, you choose the number of executors to use. * YARN is the only cluster manager for Spark that supports security and Kerberized clusters. Some advantages of using standalone: * It has been around for longer, so it is likely a little more stable. * Many report faster startup times for apps. -Sandy On Wed, May 14, 2014 at 3:06 PM, Vipul Pandey vipan...@gmail.com wrote: So here's a followup question : What's the preferred mode? We have a new cluster coming up with petabytes of data and we intend to take Spark to production. We are trying to figure out what mode would be safe and stable for production like environment. pros and cons? anyone? Any reasons why one would chose Standalone over YARN? Thanks, Vipul On May 4, 2014, at 5:56 PM, Liu, Raymond raymond@intel.com wrote: In the core, they are not quite different In standalone mode, you have spark master and spark worker who allocate driver and executors for your spark app. While in Yarn mode, Yarn resource manager and node manager do this work. When the driver and executors have been launched, the rest part of resource scheduling go through the same process, say between driver and executor through akka actor. Best Regards, Raymond Liu -Original Message- From: Sophia [mailto:sln-1...@163.com] Hey you guys, What is the different in spark on yarn mode and standalone mode about resource schedule? Wish you happy everyday. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/different-in-spark-on-yarn-mode-and-standalone-mode-tp5300.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Reading from .bz2 files with Spark
Hi Andrew, I verified that this is due to thread safety. I changed SPARK_WORKER_CORES to 1 in spark-env.sh, so there is only 1 thread per worker. Then I can load the file without any problem with different values of minPartitions. I will submit a JIRA to both Spark and Hadoop. Best, Xiangrui On Thu, May 15, 2014 at 3:48 PM, Xiangrui Meng men...@gmail.com wrote: Hi Andrew, Could you try varying the minPartitions parameter? For example: val r = sc.textFile(/user/aa/myfile.bz2, 4).count val r = sc.textFile(/user/aa/myfile.bz2, 8).count Best, Xiangrui On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote: Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes the problem you described, but it does contain several fixes to bzip2 format. -Xiangrui On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote: Hi all, Is anyone reading and writing to .bz2 files stored in HDFS from Spark with success? I'm finding the following results on a recent commit (756c96 from 24hr ago) and CDH 4.4.0: Works: val r = sc.textFile(/user/aa/myfile.bz2).count Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) = s+| ).count Specifically, I'm getting an exception coming out of the bzip2 libraries (see below stacktraces), which is unusual because I'm able to read from that file without an issue using the same libraries via Pig. It was originally created from Pig as well. Digging a little deeper I found this line in the .bz2 decompressor's javadoc for CBZip2InputStream: Instances of this class are not threadsafe. [source] My current working theory is that Spark has a much higher level of parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds exceptions much more frequently (as in can't finish a run over a little 2M row file) vs hardly at all in other libraries. The only other reference I could find to the issue was in presto-users, but the recommendation to leave .bz2 for .lzo doesn't help if I actually do want the higher compression levels of .bz2. Would love to hear if I have some kind of configuration issue or if there's a bug in .bz2 that's fixed in later versions of CDH, or generally any other thoughts on the issue. Thanks! Andrew Below are examples of some exceptions I'm getting: 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 65535 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) java.lang.ArrayIndexOutOfBoundsException: 90 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) at
Proper way to create standalone app with custom Spark version
(Sorry if you have already seen this message - it seems like there were some issues delivering messages to the list yesterday) We can create standalone Spark application by simply adding spark-core_2.x to build.sbt/pom.xml and connecting it to Spark master. We can also build custom version of Spark (e.g. compiled against Hadoop 2.x) from source and deploy it to cluster manually. But what is a proper way to use _custom version_ of Spark in _standalone application_? I'm currently trying to deploy custom version to local Maven repository and add it to SBT project. Another option is to add Spark as local jar to every project. But both of these ways look overcomplicated and in general wrong. So what is the implied way to do it? Thanks, Andrei
Re: Distribute jar dependencies via sc.AddJar(fileName)
Hi Xiangrui, We're still using Spark 0.9 branch, and our job is submitted by ./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar YOUR_APP_JAR_FILE \ --class APP_MAIN_CLASS \ --args APP_MAIN_ARGUMENTS \ --num-workers NUMBER_OF_WORKER_MACHINES \ --master-class ApplicationMaster_CLASS --master-memory MEMORY_FOR_MASTER \ --worker-memory MEMORY_PER_WORKER \ --addJars any_local_files_used_in_SparkContext.addJar Based on my understanding of the code in yarn-standalone mode, the jar distributing from local machine to application master is through distributed cache (using hadoop yarn-client api). From application master to executors, it's through http server. I maybe wrong, but if you look at the code in SparkContext addJar method, you can see the jar is added to http server in yarn-standalone mode. if (SparkHadoopUtil.get.isYarnMode() master == yarn-standalone) { // In order for this to work in yarn standalone mode the user must specify the // --addjars option to the client to upload the file into the distributed cache // of the AM to make it show up in the current working directory. val fileName = new Path(uri.getPath).getName() try { env.httpFileServer.addJar(new File(fileName)) } catch { Those jars will be fetched in Executor from http server and added to classloader of Executor class, see private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { synchronized { // Fetch missing dependencies for ((name, timestamp) - newFiles if currentFiles.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentFiles(name) = timestamp } for ((name, timestamp) - newJars if currentJars.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentJars(name) = timestamp // Add it to our class loader val localName = name.split(/).last val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL if (!urlClassLoader.getURLs.contains(url)) { urlClassLoader.addURL(url) } } The problem seems to be that jars are added to the classloader of Executor classes, and they are not accessible in Task.scala. I verified this by trying to load our custom classes in Executor.scala, and it works. But if I tried to load those classes in Task.scala, I'll get classNotFound exception. Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng men...@gmail.com wrote: In SparkContext#addJar, for yarn-standalone mode, the workers should get the jars from local distributed cache instead of fetching them from the http server. Could you send the command you used to submit the job? -Xiangrui On Wed, May 14, 2014 at 1:26 AM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, I actually used `yarn-standalone`, sorry for misleading. I did debugging in the last couple days, and everything up to updateDependency in executor.scala works. I also checked the file size and md5sum in the executors, and they are the same as the one in driver. Gonna do more testing tomorrow. Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Tue, May 13, 2014 at 11:41 PM, Xiangrui Meng men...@gmail.com wrote: I don't know whether this would fix the problem. In v0.9, you need `yarn-standalone` instead of `yarn-cluster`. See https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08 On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng men...@gmail.com wrote: Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui On Mon, May 12, 2014 at 11:14 AM, DB Tsai dbt...@stanford.edu wrote: We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add jar dependencies in command line with --addJars option. However, those external jars are only available in the driver (application running in hadoop), and not available in the executors (workers). After doing some research, we realize that we've to push those jars to executors in driver via sc.AddJar(fileName). Although in the driver's log (see the following), the jar is successfully added in the http server in the driver, and I confirm that it's downloadable from any machine in the network, I still get `java.lang.NoClassDefFoundError`
Re: How to pass config variables to workers
I found that the easiest way was to pass variables in the Spark configuration object. The only catch is that all of your properties keys must being with spark. in order for Spark to propagate the values. So, for example, in the driver: SparkConf conf = new SparkConf(); conf.set(spark.myapp.myproperty, propertyValue); JavaSparkContext context = new JavaSparkContext(conf); I realize that this is most likely a hack, but it works and is easy (at least for me) to follow from a programming standpoint compared to setting environment variables outside of the program. Regards, Theodore Wong - -- Theodore Wong lt;t...@tmwong.orggt; www.tmwong.org -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-config-variables-to-workers-tp5780p5880.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Dead lock running multiple Spark jobs on Mesos
Andrew, thanks for your response. When using the coarse mode, the jobs run fine. My problem is the fine-grained mode. Here the parallel jobs nearly always end in a dead lock. It seems to have something to do with resource allocation, as Mesos shows neither used nor idle CPU resources in this state. I do not understand what this means. Any ideas how to analysis this problem are welcome. Martin Am 13.05.2014 08:48, schrieb Andrew Ash: Are you setting a core limit with spark.cores.max? If you don't, in coarse mode each Spark job uses all available cores on Mesos and doesn't let them go until the job is terminated. At which point the other job can access the cores. https://spark.apache.org/docs/latest/running-on-mesos.html -- Mesos Run Modes section The quick fix should be to set spark.cores.max to half of your cluster's cores to support running two jobs concurrently. Alternatively, switching to fine-grained mode would help here too at the expense of higher latency on startup. On Mon, May 12, 2014 at 12:37 PM, Martin Weindel martin.wein...@gmail.com mailto:martin.wein...@gmail.com wrote: I'm using a current Spark 1.0.0-SNAPSHOT for Hadoop 2.2.0 on Mesos 0.17.0. If I run a single Spark Job, the job runs fine on Mesos. Running multiple Spark Jobs also works, if I'm using the coarse-grained mode (spark.mesos.coarse = true). But if I run two Spark Jobs in parallel using the fine-grained mode, the jobs seem to block each other after a few seconds. And the Mesos UI reports no idle but also no used CPUs in this state. As soon as I kill one job, the other continues normally. See below for some log output. Looks to me as if something strange happens with the CPU resources. Can anybody give me a hint about the cause? The jobs read some HDFS files, but have no other communication to external processes. Or any other suggestions how to analyze this problem? Thanks, Martin - Here is the relevant log output of the driver of job1: INFO 17:53:09,247 Missing parents for Stage 2: List() INFO 17:53:09,250 Submitting Stage 2 (MapPartitionsRDD[9] at mapPartitions at HighTemperatureSpansPerLogfile.java:92), which is now runnable INFO 17:53:09,269 Submitting 1 missing tasks from Stage 2 (MapPartitionsRDD[9] at mapPartitions at HighTemperatureSpansPerLogfile.java:92) INFO 17:53:09,269 Adding task set 2.0 with 1 tasks *** at this point the job was killed *** log output of driver of job2: INFO 17:53:04,874 Missing parents for Stage 6: List() INFO 17:53:04,875 Submitting Stage 6 (MappedRDD[23] at values at ComputeLogFileTimespan.java:71), which is now runnable INFO 17:53:04,881 Submitting 1 missing tasks from Stage 6 (MappedRDD[23] at values at ComputeLogFileTimespan.java:71) INFO 17:53:04,882 Adding task set 6.0 with 1 tasks *** at this point the job 1 was killed *** INFO 18:01:39,307 Starting task 6.0:0 as TID 7 on executor 20140501-141732-308511242-5050-2657-1:myclusternode (PROCESS_LOCAL) INFO 18:01:39,307 Serialized task 6.0:0 as 3052 bytes in 0 ms INFO 18:01:39,328 Asked to send map output locations for shuffle 2 to spark@ mailto:sp...@ustst018-cep-node1.usu.usu.grp:40542myclusternode:40542 mailto:sp...@ustst018-cep-node1.usu.usu.grp:40542 INFO 18:01:39,328 Size of output statuses for shuffle 2 is 178 bytes
help me: Out of memory when spark streaming
hi, All I encountered OOM when streaming. I send data to spark streaming through Zeromq at a speed of 600 records per second, but the spark streaming only handle 10 records per 5 seconds( set it in streaming program) my two workers have 4 cores CPU and 1G RAM. These workers always occur Out Of Memory after moments. I tried to adjust JVM GC arguments to speed up GC process. Actually, it made a little bit change of performance, but workers finally occur OOM. Is there any way to resolve it? it would be appreciated if anyone can help me to get it fixed ! Thanks, Francis.Hu
Debugging Spark AWS S3
I have Spark code which runs beautifully when MASTER=local. When I run it with MASTER set to a spark ec2 cluster, the workers seem to run, but the results, which are supposed to be put to AWS S3, don't appear on S3. I'm at a loss for how to debug this. I don't see any S3 exceptions anywhere. Can anyone guide me on how I might debug this?
Re: Proper way to create standalone app with custom Spark version
Install your custom spark jar to your local maven or ivy repo. Use this custom jar in your pom/sbt file. On May 15, 2014, at 3:28 AM, Andrei faithlessfri...@gmail.com wrote: (Sorry if you have already seen this message - it seems like there were some issues delivering messages to the list yesterday) We can create standalone Spark application by simply adding spark-core_2.x to build.sbt/pom.xml and connecting it to Spark master. We can also build custom version of Spark (e.g. compiled against Hadoop 2.x) from source and deploy it to cluster manually. But what is a proper way to use _custom version_ of Spark in _standalone application_? I'm currently trying to deploy custom version to local Maven repository and add it to SBT project. Another option is to add Spark as local jar to every project. But both of these ways look overcomplicated and in general wrong. So what is the implied way to do it? Thanks, Andrei
Re: How to pass config variables to workers
Not a hack, this is documented here: http://spark.apache.org/docs/0.9.1/configuration.html, and is in fact the proper way of setting per-application Spark configurations. Additionally, you can specify default Spark configurations so you don't need to manually set it for all applications. If you are running Spark 0.9 or before, then you could set them through the environment variable SPARK_JAVA_OPTS in conf/spark-env.sh. As of Spark 1.0, however, this mechanism is deprecated. The new way of setting default Spark configurations is through conf/spark-defaults.conf in the following format spark.config.one value spark.config.two value2 More details are documented here: http://people.apache.org/~pwendell/spark-1.0.0-rc7-docs/configuration.html. 2014-05-16 15:16 GMT-07:00 Theodore Wong t...@tmwong.org: I found that the easiest way was to pass variables in the Spark configuration object. The only catch is that all of your properties keys must being with spark. in order for Spark to propagate the values. So, for example, in the driver: SparkConf conf = new SparkConf(); conf.set(spark.myapp.myproperty, propertyValue); JavaSparkContext context = new JavaSparkContext(conf); I realize that this is most likely a hack, but it works and is easy (at least for me) to follow from a programming standpoint compared to setting environment variables outside of the program. Regards, Theodore Wong - -- Theodore Wong lt;t...@tmwong.orggt; www.tmwong.org -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-config-variables-to-workers-tp5780p5880.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: java serialization errors with spark.files.userClassPathFirst=true
i do not think the current solution will work. i tried writing a version of ChildExecutorURLClassLoader that does have a proper parent and has a modified loadClass to reverse the order of parent and child in finding classes, and that seems to work, but now classes like SparkEnv are loaded by the child and somehow this means the companion objects are reset or something like that because i get NPEs. On Fri, May 16, 2014 at 3:54 PM, Koert Kuipers ko...@tresata.com wrote: ok i think the issue is visibility: a classloader can see all classes loaded by its parent classloader. but userClassLoader does not have a parent classloader, so its not able to see any classes that parentLoader is responsible for. in my case userClassLoader is trying to get AvroInputFormat which probably somewhere statically references FileInputFormat, which is invisible to userClassLoader. On Fri, May 16, 2014 at 3:32 PM, Koert Kuipers ko...@tresata.com wrote: ok i put lots of logging statements in the ChildExecutorURLClassLoader. this is what i see: * the urls for userClassLoader are correct and includes only my one jar. * for one class that only exists in my jar i see it gets loaded correctly using userClassLoader * for a class that exists in both my jar and spark kernel it tries to use userClassLoader and ends up with a NoClassDefFoundError. the class is org.apache.avro.mapred.AvroInputFormat and the NoClassDefFoundError is for org.apache.hadoop.mapred.FileInputFormat (which the parentClassLoader is responsible for since it is not in my jar). i currently catch this NoClassDefFoundError and call parentClassLoader.loadClass but thats clearly not a solution since it loads the wrong version. On Fri, May 16, 2014 at 2:25 PM, Koert Kuipers ko...@tresata.com wrote: well, i modified ChildExecutorURLClassLoader to also delegate to parentClassloader if NoClassDefFoundError is thrown... now i get yet another error. i am clearly missing something with these classloaders. such nasty stuff... giving up for now. just going to have to not use spark.files.userClassPathFirst=true for now, until i have more time to look at this. 14/05/16 13:58:59 ERROR Executor: Exception in task ID 3 java.lang.ClassCastException: cannot assign instance of scala.None$ to field org.apache.spark.rdd.RDD.checkpointData of type scala.Option in instance of MyRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1995) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:60) On Fri, May 16, 2014 at 1:46 PM, Koert Kuipers ko...@tresata.comwrote: after removing all class paramater of class Path from my code, i tried again. different but related eror when i set spark.files.userClassPathFirst=true now i dont even use FileInputFormat directly. HadoopRDD does... 14/05/16 12:17:17 ERROR Executor: Exception in task ID 45 java.lang.NoClassDefFoundError:
Re: Debugging Spark AWS S3
Did you check the executor stderr logs? On 5/16/14, 2:37 PM, Robert James srobertja...@gmail.com wrote: I have Spark code which runs beautifully when MASTER=local. When I run it with MASTER set to a spark ec2 cluster, the workers seem to run, but the results, which are supposed to be put to AWS S3, don't appear on S3. I'm at a loss for how to debug this. I don't see any S3 exceptions anywhere. Can anyone guide me on how I might debug this?
Re: Is there any problem on the spark mailing list?
Same here. I've posted a bunch of questions in the last few days and they don't show up here and I'm also not getting email to my (gmail.com) account. I came here to post directly on the mailing list but saw this thread instead. At least, I'm not alone. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-any-problem-on-the-spark-mailing-list-tp5509p5526.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Historical Data as Stream
File is just a steam with a fixed length. Usually streams don't end but in this case it would. On the other hand if you real your file as a steam may not be able to use the entire data in the file for your analysis. Spark (give enough memory) can process large amounts of data quickly. On May 15, 2014, at 9:52 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, I have data in a file. Can I read it as Stream in spark? I know it seems odd to read file as stream but it has practical applications in real life if I can read it as stream. It there any other tools which can give this file as stream to Spark or I have to make batches manually which is not what I want. Its a coloumn of a million values. Regards, Laeeq
Re: Understanding epsilon in KMeans
Hi Stuti, I think you're right. The epsilon parameter is indeed used as a threshold for deciding when KMeans has converged. If you look at line 201 of mllib's KMeans.scala: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L201 you can see that if any center moves more than epsilon units away from its prior position, in an L2-norm sense, then the algorithm has NOT converged. `changed` is set to true, and the outer `while` loop repeats. Your intuition was correct; you can use .setEpsilon to control this threshold value for deciding if any center has moved far enough to be considered a non-converged iteration of the algorithm. Best, --Brian On Wed, May 14, 2014 at 8:35 PM, Stuti Awasthi stutiawas...@hcl.com wrote: Hi All, Any ideas on this ?? Thanks Stuti Awasthi *From:* Stuti Awasthi *Sent:* Wednesday, May 14, 2014 6:20 PM *To:* user@spark.apache.org *Subject:* Understanding epsilon in KMeans Hi All, I wanted to understand the functionality of epsilon in KMeans in Spark MLlib. As per documentation : distance threshold within which we've consider centers to have converged.If all centers move less than this *Euclidean* distance, we stop iterating one run. Now I have assumed that if centers are moving less than epsilon value then Clustering Stops but then what does it mean by “we stop iterating one run”.. Now suppose I have given maxIterations=10 and epsilon = 0.1 and assume that centers are afteronly 2 iteration, the epsilon condition is met i.e. now centers are moving only less than 0.1.. Now what happens ?? The whole 10 iterations are completed OR the Clustering stops ?? My 2nd query is in Mahout, there is a configuration param : “Convergence Threshold (cd)” which states : “in an iteration, the centroids don’t move more than this distance, no further iterations are done and clustering stops.” So is epsilon and cd similar ?? 3rd query : How to pass epsilon as a configurable param. KMeans.train() does not provide the way but in code I can see “setEpsilon” as method. SO if I want to pass the parameter as epsilon=0.1 , how may I do that.. Pardon my ignorance Thanks Stuti Awasthi ::DISCLAIMER:: The contents of this e-mail and any attachment(s) are confidential and intended for the named recipient(s) only. E-mail transmission is not guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or may contain viruses in transmission. The e mail and its contents (with or without referred errors) shall therefore not attach any liability on the originator or HCL or its affiliates. Views or opinions, if any, presented in this email are solely those of the author and may not necessarily reflect the views or opinions of HCL or its affiliates. Any form of reproduction, dissemination, copying, disclosure, modification, distribution and / or publication of this message without the prior written consent of authorized representative of HCL is strictly prohibited. If you have received this email in error please delete it and notify the sender immediately. Before opening any email and/or attachments, please check them for viruses and other defects.
Re: SparkContext startup time out
How did you deal with this problem finally?I also met with it. Best regards, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-startup-time-out-tp1753p5739.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Reading from .bz2 files with Spark
Hi Andrew, This is the JIRA I created: https://issues.apache.org/jira/browse/MAPREDUCE-5893 . Hopefully someone wants to work on it. Best, Xiangrui On Fri, May 16, 2014 at 6:47 PM, Xiangrui Meng men...@gmail.com wrote: Hi Andre, I could reproduce the bug with Hadoop 2.2.0. Some older version of Hadoop do not support splittable compression, so you ended up with sequential reads. It is easy to reproduce the bug with the following setup: 1) Workers are configured with multiple cores. 2) BZip2 files are big enough or minPartitions is large enough when you load the file via sc.textFile(), so that one worker has more than one tasks. Best, Xiangrui On Fri, May 16, 2014 at 4:06 PM, Andrew Ash and...@andrewash.com wrote: Hi Xiangrui, // FYI I'm getting your emails late due to the Apache mailing list outage I'm using CDH4.4.0, which I think uses the MapReduce v2 API. The .jars are named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar I'm also glad you were able to reproduce! Please paste a link to the Hadoop bug you file so I can follow along. Thanks! Andrew On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote: Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes the problem you described, but it does contain several fixes to bzip2 format. -Xiangrui On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote: Hi all, Is anyone reading and writing to .bz2 files stored in HDFS from Spark with success? I'm finding the following results on a recent commit (756c96 from 24hr ago) and CDH 4.4.0: Works: val r = sc.textFile(/user/aa/myfile.bz2).count Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) = s+| ).count Specifically, I'm getting an exception coming out of the bzip2 libraries (see below stacktraces), which is unusual because I'm able to read from that file without an issue using the same libraries via Pig. It was originally created from Pig as well. Digging a little deeper I found this line in the .bz2 decompressor's javadoc for CBZip2InputStream: Instances of this class are not threadsafe. [source] My current working theory is that Spark has a much higher level of parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds exceptions much more frequently (as in can't finish a run over a little 2M row file) vs hardly at all in other libraries. The only other reference I could find to the issue was in presto-users, but the recommendation to leave .bz2 for .lzo doesn't help if I actually do want the higher compression levels of .bz2. Would love to hear if I have some kind of configuration issue or if there's a bug in .bz2 that's fixed in later versions of CDH, or generally any other thoughts on the issue. Thanks! Andrew Below are examples of some exceptions I'm getting: 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 65535 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) java.lang.ArrayIndexOutOfBoundsException: 90 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at
Re: Schema view of HadoopRDD
so you can use a input output format read it whichever way you write... You can additionally provide variables in hadoop configuration to configure. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, May 8, 2014 at 10:39 AM, Debasish Das debasish.da...@gmail.comwrote: Hi, For each line that we read as textLine from HDFS, we have a schema..if there is an API that takes the schema as List[Symbol] and maps each token to the Symbol it will be helpful... Does RDDs provide a schema view of the dataset on HDFS ? Thanks. Deb
Re: Reading from .bz2 files with Spark
Hi Andre, I could reproduce the bug with Hadoop 2.2.0. Some older version of Hadoop do not support splittable compression, so you ended up with sequential reads. It is easy to reproduce the bug with the following setup: 1) Workers are configured with multiple cores. 2) BZip2 files are big enough or minPartitions is large enough when you load the file via sc.textFile(), so that one worker has more than one tasks. Best, Xiangrui On Fri, May 16, 2014 at 4:06 PM, Andrew Ash and...@andrewash.com wrote: Hi Xiangrui, // FYI I'm getting your emails late due to the Apache mailing list outage I'm using CDH4.4.0, which I think uses the MapReduce v2 API. The .jars are named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar I'm also glad you were able to reproduce! Please paste a link to the Hadoop bug you file so I can follow along. Thanks! Andrew On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote: Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes the problem you described, but it does contain several fixes to bzip2 format. -Xiangrui On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote: Hi all, Is anyone reading and writing to .bz2 files stored in HDFS from Spark with success? I'm finding the following results on a recent commit (756c96 from 24hr ago) and CDH 4.4.0: Works: val r = sc.textFile(/user/aa/myfile.bz2).count Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) = s+| ).count Specifically, I'm getting an exception coming out of the bzip2 libraries (see below stacktraces), which is unusual because I'm able to read from that file without an issue using the same libraries via Pig. It was originally created from Pig as well. Digging a little deeper I found this line in the .bz2 decompressor's javadoc for CBZip2InputStream: Instances of this class are not threadsafe. [source] My current working theory is that Spark has a much higher level of parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds exceptions much more frequently (as in can't finish a run over a little 2M row file) vs hardly at all in other libraries. The only other reference I could find to the issue was in presto-users, but the recommendation to leave .bz2 for .lzo doesn't help if I actually do want the higher compression levels of .bz2. Would love to hear if I have some kind of configuration issue or if there's a bug in .bz2 that's fixed in later versions of CDH, or generally any other thoughts on the issue. Thanks! Andrew Below are examples of some exceptions I'm getting: 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 65535 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) java.lang.ArrayIndexOutOfBoundsException: 90 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
Re: Express VMs - good idea?
Frankly if you can give enough CPU performance to VM it should be good... but for development setting up locally is better 1. debuggable in IDE 2. Faster 3. samples like run-example etc Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, May 14, 2014 at 5:30 PM, Marco Shaw marco.s...@gmail.com wrote: Hi, I've wanted to play with Spark. I wanted to fast track things and just use one of the vendor's express VMs. I've tried Cloudera CDH 5.0 and Hortonworks HDP 2.1. I've not written down all of my issues, but for certain, when I try to run spark-shell it doesn't work. Cloudera seems to crash, and both complain when I try to use SparkContext in a simple Scala command. So, just a basic question on whether anyone has had success getting these express VMs to work properly with Spark *out of the box* (HDP does required you install Spark manually). I know Cloudera recommends 8GB of RAM, but I've been running it with 4GB. Could it be that 4GB is just not enough, and causing issues or have others had success using these Hadoop 2.x pre-built VMs with Spark 0.9.x? Marco
Re: What is the difference between a Spark Worker and a Spark Slave?
They are different terminology for the same thing and should be interchangeable. On Fri, May 16, 2014 at 2:02 PM, Robert James srobertja...@gmail.comwrote: What is the difference between a Spark Worker and a Spark Slave?
Re: How to pass config variables to workers
Sorry, yes, you are right, the documentation does indeed explain that setting spark.* options is the way to pass Spark configuration options to workers. Additionally, we've use the same mechanism to pass application-specific configuration options to workers; the hack part is naming our application-specific options spark.myapp.*, which relies on the Spark library just copying around spark.* options without checking to see whether the option names are valid Spark options. Regards, Theodore - -- Theodore Wong lt;t...@tmwong.orggt; www.tmwong.org -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-config-variables-to-workers-tp5780p5916.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Historical Data as Stream
Hi, I have data in a file. Can I read it as Stream in spark? I know it seems odd to read file as stream but it has practical applications in real life if I can read it as stream. It there any other tools which can give this file as stream to Spark or I have to make batches manually which is not what I want. Its a coloumn of a million values. Regards, Laeeq
Re: run spark0.9.1 on yarn with hadoop CDH4
Hi Sophia, Unfortunately, Spark doesn't work against YARN in CDH4. The YARN APIs changed quite a bit before finally being stabilized in Hadoop 2.2 and CDH5. Spark on YARN supports Hadoop 0.23.* and Hadoop 2.2+ / CDH5.0+, but does not support CDH4, which is somewhere in between. -Sandy On Fri, May 9, 2014 at 12:13 AM, Arpit Tak arpi...@sigmoidanalytics.comwrote: Also try this out , we have already done this .. It will help you.. http://docs.sigmoidanalytics.com/index.php/Setup_hadoop_2.0.0-cdh4.2.0_and_spark_0.9.0_on_ubuntu_12.04 On Tue, May 6, 2014 at 10:17 PM, Andrew Lee alee...@hotmail.com wrote: Please check JAVA_HOME. Usually it should point to /usr/java/default on CentOS/Linux. or FYI: http://stackoverflow.com/questions/1117398/java-home-directory Date: Tue, 6 May 2014 00:23:02 -0700 From: sln-1...@163.com To: u...@spark.incubator.apache.org Subject: run spark0.9.1 on yarn with hadoop CDH4 Hi all, I have make HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster. The command to launch the YARN Client which I run is like this: # SPARK_JAR=./~/spark-0.9.1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar ./bin/spark-class org.apache.spark.deploy.yarn.Client\--jar examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.1.jar\--class org.apache.spark.examples.SparkPi\--args yarn-standalone \--num-workers 3 \--master-memory 2g \--worker-memory 2g \--worker-cores 1 ./bin/spark-class: line 152: /usr/lib/jvm/java-7-sun/bin/java: No such file or directory ./bin/spark-class: line 152: exec: /usr/lib/jvm/java-7-sun/bin/java: cannot execute: No such file or directory How to make it runs well? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/run-spark0-9-1-on-yarn-with-hadoop-CDH4-tp5426.html Sent from the Apache Spark User List mailing list archive at Nabble.com.