Re: Spark-Ec2 launch failed on starting httpd spark 141
Looks like it is this PR: https://github.com/mesos/spark-ec2/pull/133 On Tue, Aug 25, 2015 at 9:52 AM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: Yeah thats a know issue and we have a PR out to fix it. Shivaram On Tue, Aug 25, 2015 at 7:39 AM, Garry Chen g...@cornell.edu wrote: Hi All, I am trying to lunch a spark cluster on ec2 with spark 1.4.1 version. The script finished but getting error at the end as following. What should I do to correct this issue. Thank you very much for your input. Starting httpd: httpd: Syntax error on line 199 of /etc/httpd/conf/httpd.conf: Cannot load modules/libphp-5.5.so into server: /etc/httpd/modules/libphp-5.5.so: cannot open shared object file: No such file or directory Garry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Adding/subtracting org.apache.spark.mllib.linalg.Vector in Scala?
However I do think it's easier than it seems to write the implicits; it doesn't involve new classes or anything. Yes it's pretty much just what you wrote. There is a class Vector in Spark. This declaration can be in an object; you don't implement your own class. (Also you can use toBreeze to get Breeze vectors.) The implicit conversion with the implicit def happens for the first vector in the sum, but not the second vector (see below). At this point I give up, because I spent way too much time. I am so disappointed. So many times I heard Spark makes simple things easy and complicated things possible. Well, here is the simplest thing you can imagine in linear algebra, but heck, it is not easy or intuitive. It was easier to run a DeepLearning algo (from another library) than add two vectors. If anybody has a workaround other than implementing your own add/substract/scalarMultiply, PLEASE let me know. Here is the code and error from (freshly started) spark-shell: scala import breeze.linalg.{DenseVector = BDV, SparseVector = BSV, Vector = BV} import breeze.linalg.{DenseVector=BDV, SparseVector=BSV, Vector=BV} scala import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.Vectors scala val v1 = Vectors.dense(1.0, 2.0, 3.0) v1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0] scala import org.apache.spark.mllib.linalg.{Vector =SparkVector} import org.apache.spark.mllib.linalg.{Vector=SparkVector} scala object MyUtils { | implicit def toBreeze(v:SparkVector) = BV(v.toArray) | } warning: there were 1 feature warning(s); re-run with -feature for details defined module MyUtils scala import MyUtils._ import MyUtils._ scala v1:BV[Double] res2: breeze.linalg.Vector[Double] = DenseVector(1.0, 2.0, 3.0) scala v1 + v1 console:30: error: could not find implicit value for parameter op: breeze.linalg.operators.OpAdd.Impl2[breeze.linalg.Vector[Double],org.apache.spark.mllib.linalg.Vector,That] v1 + v1 ^
Re: Exclude slf4j-log4j12 from the classpath via spark-submit
This worked for me locally: spark-1.4.1-bin-hadoop2.4/bin/spark-submit --conf spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar:/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar --conf spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar:/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar --verbose --class runner.SparkRunner target/simspark-0.1-SNAPSHOT.jar Now I am going to try it out on our mesos cluster. I assumed spark.executor.extraClassPath takes csv as jars the way --jars takes it but it should be : separated like a regular classpath jar. Thanks for your help! -Utkarsh On Mon, Aug 24, 2015 at 5:05 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote: I get the same error even when I set the SPARK_CLASSPATH: export SPARK_CLASSPATH=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.1.jar:/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar And I run the job like this: /spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner target/simspark-0.1-SNAPSHOT-jar-with-dependencies.jar I am not able to find the code in spark which adds these jars before the spark classes in classpath. Or maybe its a bug. Any suggestions on workarounds? Thanks, -Utkarsh On Mon, Aug 24, 2015 at 4:32 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote: I assumed that's the case beacause of the error I got and the documentation which says: Extra classpath entries to append to the classpath of the driver. This is where I stand now: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.4.1/version exclusions exclusion groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId /exclusion /exclusions /dependency And no exclusions from my logging lib. And I submit this task: spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner --conf spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar --conf spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar --conf spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar --conf spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar target/simspark-0.1-SNAPSHOT-jar-with-dependencies.jar And I get the same error: Caused by: java.lang.ClassCastException: org.slf4j.impl.Log4jLoggerFactory cannot be cast to ch.qos.logback.classic.LoggerContext at com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68) at com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28) at com.opentable.logging.Log.clinit(Log.java:31) ... 16 more Thanks, -Utkarsh On Mon, Aug 24, 2015 at 4:11 PM, Marcelo Vanzin van...@cloudera.com wrote: On Mon, Aug 24, 2015 at 3:58 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote: That didn't work since extraClassPath flag was still appending the jars at the end, so its still picking the slf4j jar provided by spark. Out of curiosity, how did you verify this? The extraClassPath options are supposed to prepend entries to the classpath, and the code seems to be doing that. If it's not really doing that in some case, it's a bug that needs to be fixed. Another option is those is setting the SPARK_CLASSPATH env variable, which is deprecated, but might come in handy in case there is actually a bug in handling those options. -- Marcelo -- Thanks, -Utkarsh -- Thanks, -Utkarsh -- Thanks, -Utkarsh
Re: Exclude slf4j-log4j12 from the classpath via spark-submit
On Tue, Aug 25, 2015 at 10:48 AM, Utkarsh Sengar utkarsh2...@gmail.com wrote: Now I am going to try it out on our mesos cluster. I assumed spark.executor.extraClassPath takes csv as jars the way --jars takes it but it should be : separated like a regular classpath jar. Ah, yes, those options are just raw classpath strings. Also, they don't cause jars to be copied to the cluster. You'll need the jar to be available at the same location on all cluster machines. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkR: exported functions
Hi, I've just started playing about with SparkR (Spark 1.4.1), and noticed that a number of the functions haven't been exported. For example, the textFile function https://github.com/apache/spark/blob/master/R/pkg/R/context.R isn't exported, i.e. the function isn't in the NAMESPACE file. This is obviously due to the ' missing in the roxygen2 directives. Is this intentional? Thanks Colin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Local Spark talking to remote HDFS?
I wouldn't try to play with forwarding tunnelling; always hard to work out what ports get used everywhere, and the services like hostname==URL in paths. Can't you just set up an entry in the windows /etc/hosts file? It's what I do (on Unix) to talk to VMs On 25 Aug 2015, at 04:49, Dino Fancellu d...@felstar.com wrote: Tried adding 50010, 50020 and 50090. Still no difference. I can't imagine I'm the only person on the planet wanting to do this. Anyway, thanks for trying to help. Dino. On 25 August 2015 at 08:22, Roberto Congiu roberto.con...@gmail.com wrote: Port 8020 is not the only port you need tunnelled for HDFS to work. If you only list the contents of a directory, port 8020 is enough... for instance, using something val p = new org.apache.hadoop.fs.Path(hdfs://localhost:8020/) val fs = p.getFileSystem(sc.hadoopConfiguration) fs.listStatus(p) you should see the file list. But then, when accessing a file, you need to actually get its blocks, it has to connect to the data node. The error 'could not obtain block' means it can't get that block from the DataNode. Refer to http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-1.2.1/bk_reference/content/reference_chap2_1.html to see the complete list of ports that also need to be tunnelled. 2015-08-24 13:10 GMT-07:00 Dino Fancellu d...@felstar.com: Changing the ip to the guest IP address just never connects. The VM has port tunnelling, and it passes through all the main ports, 8020 included to the host VM. You can tell that it was talking to the guest VM before, simply because it said when file not found Error is: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-452094660-10.0.2.15-1437494483194:blk_1073742905_2098 file=/tmp/people.txt but I have no idea what it means by that. It certainly can find the file and knows it exists. On 24 August 2015 at 20:43, Roberto Congiu roberto.con...@gmail.com wrote: When you launch your HDP guest VM, most likely it gets launched with NAT and an address on a private network (192.168.x.x) so on your windows host you should use that address (you can find out using ifconfig on the guest OS). I usually add an entry to my /etc/hosts for VMs that I use oftenif you use vagrant, there's also a vagrant module that can do that automatically. Also, I am not sure how the default HDP VM is set up, that is, if it only binds HDFS to 127.0.0.1 or to all addresses. You can check that with netstat -a. R. 2015-08-24 11:46 GMT-07:00 Dino Fancellu d...@felstar.com: I have a file in HDFS inside my HortonWorks HDP 2.3_1 VirtualBox VM. If I go into the guest spark-shell and refer to the file thus, it works fine val words=sc.textFile(hdfs:///tmp/people.txt) words.count However if I try to access it from a local Spark app on my Windows host, it doesn't work val conf = new SparkConf().setMaster(local).setAppName(My App) val sc = new SparkContext(conf) val words=sc.textFile(hdfs://localhost:8020/tmp/people.txt) words.count Emits The port 8020 is open, and if I choose the wrong file name, it will tell me My pom has dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.11/artifactId version1.4.1/version scopeprovided/scope /dependency Am I doing something wrong? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Local-Spark-talking-to-remote-HDFS-tp24425.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Adding/subtracting org.apache.spark.mllib.linalg.Vector in Scala?
What about declaring a few simple implicit conversions between the MLlib and Breeze Vector classes? if you import them then you should be able to write a lot of the source code just as you imagine it, as if the Breeze methods were available on the Vector object in MLlib. The problem is that *I don't know how* to write those implicit defs in Scala in a good way, and that's why I'm asking the user list for a better solution. (see below for another hack) My understanding is that I can define a new class that would extend Vector and have the implicit def conversion (as in the Scala manual, see below). Since I got burned by memory issues when using my own classes in this very way (what's the overhead of creating a new class every time I want to add two Vectors? I don't know - I'm a lowly data scientist), I'm scared to do it by myself. Since you might have many Spark users with my background (some programming, but not expert) - making everyone implement their own addVector function might cause many hours of frustration that might be so much better spent on coding. Adding +,- and scalar * can be done by a Spark contributor in under one hour (under what I spent just writing these emails), while it would take me a day (and multiply this by so many users like me), compounded by uncertainty of how to proceed - do I use ml instead of mllib because columns of a dataframe can be added while mllib can't? do I use breeze? do i use apache.commons? do I write my own (how long will it take me)? do I abandon Scala and go with pyspark because I don't have such problems in numpy? The slippery slope exists, but if you implement p-norm of a vector and sqdist between two vectors, you should also implement simpler operations too. There is a clear difference between functionality for adding two vectors and taking a determinant, for example. If I remember correctly, +,-,*,/ were implemented in a previous version of Spark in a now deprecated class, now expunged from the documentation. Many thanks, Kristina PS: is this what you meant by adding simple implicit def? should it be a class or object? These are kinds of questions I grapple with and why I'm asking for example of a solution // this is really a pseudo-code, I know BreezeVector and SparkVector are not real class names class MyVector extends SparkVector { implicit def toBreeze(v:SparkVector):BreezeVector = BreezeVector(v.toArray) implicit def fromBreeze( bv:BreezeVector ):SparkVector = Vectors.dense( bv.toArray ) } On Tue, Aug 25, 2015 at 11:11 AM, Sean Owen so...@cloudera.com wrote: Yes, you're right that it's quite on purpose to leave this API to Breeze, in the main. As you can see the Spark objects have already sprouted a few basic operations anyway; there's a slippery slope problem here. Why not addition, why not dot products, why not determinants, etc. What about declaring a few simple implicit conversions between the MLlib and Breeze Vector classes? if you import them then you should be able to write a lot of the source code just as you imagine it, as if the Breeze methods were available on the Vector object in MLlib. On Tue, Aug 25, 2015 at 3:35 PM, Kristina Rogale Plazonic kpl...@gmail.com wrote: Well, yes, the hack below works (that's all I have time for), but is not satisfactory - it is not safe, and is verbose and very cumbersome to use, does not separately deal with SparseVector case and is not complete either. My question is, out of hundreds of users on this list, someone must have come up with a better solution - please? import breeze.linalg.{DenseVector = BDV, SparseVector = BSV, Vector = BV} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.{Vector =SparkVector} def toBreeze(v:SparkVector) = BV(v.toArray) def fromBreeze(bv:BV[Double]) = Vectors.dense(bv.toArray) def add(v1:SparkVector, v2:SparkVector) = fromBreeze( toBreeze(v1) + toBreeze(v2)) def subtract(v1:SparkVector, v2:SparkVector) = fromBreeze( toBreeze(v1) - toBreeze(v2)) def scalarMultiply(a:Double, v:SparkVector) = fromBreeze( a*toBreeze(v1) ) On Tue, Aug 25, 2015 at 9:41 AM, Sonal Goyal sonalgoy...@gmail.com wrote: From what I have understood, you probably need to convert your vector to breeze and do your operations there. Check stackoverflow.com/questions/28232829/addition-of-two-rddmllib-linalg-vectors On Aug 25, 2015 7:06 PM, Kristina Rogale Plazonic kpl...@gmail.com wrote: Hi all, I'm still not clear what is the best (or, ANY) way to add/subtract two org.apache.spark.mllib.Vector objects in Scala. Ok, I understand there was a conscious Spark decision not to support linear algebra operations in Scala and leave it to the user to choose a linear algebra library. But, for any newcomer from R or Python, where you don't think twice about adding two vectors, it is such a productivity shot in the foot to have to write your own + operation. I
CHAID Decision Trees
Hi, I wish to know if MLlib supports CHAID regression and classifcation trees. If yes, how can I build them in spark? Thanks, Jatin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CHAID-Decision-Trees-tp24449.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: build spark 1.4.1 with JDK 1.6
Well, this is very strange. My only change is to add -X to make-distribution and it succeeds: % git diff (spark/spark) *diff --git a/make-distribution.sh b/make-distribution.sh* *index a2b0c43..351fac2 100755* *--- a/make-distribution.sh* *+++ b/make-distribution.sh* @@ -183,7 +183,7 @@ export MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m # Store the command as an array because $MVN variable might have spaces in it. # Normal quoting tricks don't work. # See: http://mywiki.wooledge.org/BashFAQ/050 -BUILD_COMMAND=($MVN clean package -DskipTests $@) +BUILD_COMMAND=($MVN -X clean package -DskipTests $@) # Actually build the jar echo -e \nBuilding with... export JAVA_HOME=/Library/Java/Home % which javac /usr/bin/javac % javac -version javac 1.7.0_79 On Mon, Aug 24, 2015 at 11:30 PM, Sean Owen so...@cloudera.com wrote: -cdh-user This suggests that Maven is still using Java 6. I think this is indeed controlled by JAVA_HOME. Use 'mvn -X ...' to see a lot more about what is being used and why. I still suspect JAVA_HOME is not visible to the Maven process. Or maybe you have JRE 7 installed but not JDK 7 and it's somehow still finding the Java 6 javac. On Tue, Aug 25, 2015 at 3:45 AM, Eric Friedman eric.d.fried...@gmail.com wrote: I'm trying to build Spark 1.4 with Java 7 and despite having that as my JAVA_HOME, I get [INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) @ spark-launcher_2.10 --- [INFO] Using zinc server for incremental compilation [info] Compiling 8 Java sources to /Users/eric/spark/spark/launcher/target/scala-2.10/classes... [error] javac: invalid source release: 1.7 [error] Usage: javac options source files [error] use -help for a list of possible options [error] Compile failed at Aug 24, 2015 7:44:40 PM [0.020s] [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... SUCCESS [ 3.109 s] [INFO] Spark Project Launcher . FAILURE [ 4.493 s] On Fri, Aug 21, 2015 at 9:43 AM, Marcelo Vanzin van...@cloudera.com wrote: That was only true until Spark 1.3. Spark 1.4 can be built with JDK7 and pyspark will still work. On Fri, Aug 21, 2015 at 8:29 AM, Chen Song chen.song...@gmail.com wrote: Thanks Sean. So how PySpark is supported. I thought PySpark needs jdk 1.6. Chen On Fri, Aug 21, 2015 at 11:16 AM, Sean Owen so...@cloudera.com wrote: Spark 1.4 requires Java 7. On Fri, Aug 21, 2015, 3:12 PM Chen Song chen.song...@gmail.com wrote: I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support PySpark, I used JDK 1.6. I got the following error, [INFO] --- scala-maven-plugin:3.2.0:testCompile (scala-test-compile-first) @ spark-streaming_2.10 --- java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable : Unsupported major.minor version 51.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637) at java.lang.ClassLoader.defineClass(ClassLoader.java:621) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7. Anyone has done this before? Thanks, -- Chen Song -- Chen Song -- --- You received this message because you are subscribed to the Google Groups CDH Users group. To unsubscribe from this group and stop receiving emails from it, send an email to cdh-user+unsubscr...@cloudera.org. For more options, visit https://groups.google.com/a/cloudera.org/d/optout. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [survey] [spark-ec2] What do you like/dislike about spark-ec2?
Final chance to fill out the survey! http://goo.gl/forms/erct2s6KRR I'm gonna close it to new responses tonight and send out a summary of the results. Nick On Thu, Aug 20, 2015 at 2:08 PM Nicholas Chammas nicholas.cham...@gmail.com wrote: I'm planning to close the survey to further responses early next week. If you haven't chimed in yet, the link to the survey is here: http://goo.gl/forms/erct2s6KRR We already have some great responses, which you can view. I'll share a summary after the survey is closed. Cheers! Nick On Mon, Aug 17, 2015 at 11:09 AM Nicholas Chammas nicholas.cham...@gmail.com wrote: Howdy folks! I’m interested in hearing about what people think of spark-ec2 http://spark.apache.org/docs/latest/ec2-scripts.html outside of the formal JIRA process. Your answers will all be anonymous and public. If the embedded form below doesn’t work for you, you can use this link to get the same survey: http://goo.gl/forms/erct2s6KRR Cheers! Nick
Re: Adding/subtracting org.apache.spark.mllib.linalg.Vector in Scala?
Hmm. I have a lot of code on the local linear algebra operations using Spark's Matrix and Vector representations done for https://issues.apache.org/jira/browse/SPARK-6442. I can make a Spark package with that code if people are interested. Best, Burak On Tue, Aug 25, 2015 at 10:54 AM, Kristina Rogale Plazonic kpl...@gmail.com wrote: However I do think it's easier than it seems to write the implicits; it doesn't involve new classes or anything. Yes it's pretty much just what you wrote. There is a class Vector in Spark. This declaration can be in an object; you don't implement your own class. (Also you can use toBreeze to get Breeze vectors.) The implicit conversion with the implicit def happens for the first vector in the sum, but not the second vector (see below). At this point I give up, because I spent way too much time. I am so disappointed. So many times I heard Spark makes simple things easy and complicated things possible. Well, here is the simplest thing you can imagine in linear algebra, but heck, it is not easy or intuitive. It was easier to run a DeepLearning algo (from another library) than add two vectors. If anybody has a workaround other than implementing your own add/substract/scalarMultiply, PLEASE let me know. Here is the code and error from (freshly started) spark-shell: scala import breeze.linalg.{DenseVector = BDV, SparseVector = BSV, Vector = BV} import breeze.linalg.{DenseVector=BDV, SparseVector=BSV, Vector=BV} scala import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.Vectors scala val v1 = Vectors.dense(1.0, 2.0, 3.0) v1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0] scala import org.apache.spark.mllib.linalg.{Vector =SparkVector} import org.apache.spark.mllib.linalg.{Vector=SparkVector} scala object MyUtils { | implicit def toBreeze(v:SparkVector) = BV(v.toArray) | } warning: there were 1 feature warning(s); re-run with -feature for details defined module MyUtils scala import MyUtils._ import MyUtils._ scala v1:BV[Double] res2: breeze.linalg.Vector[Double] = DenseVector(1.0, 2.0, 3.0) scala v1 + v1 console:30: error: could not find implicit value for parameter op: breeze.linalg.operators.OpAdd.Impl2[breeze.linalg.Vector[Double],org.apache.spark.mllib.linalg.Vector,That] v1 + v1 ^
Spark Streaming Checkpointing Restarts with 0 Event Batches
Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Adding/subtracting org.apache.spark.mllib.linalg.Vector in Scala?
YES PLEASE! :))) On Tue, Aug 25, 2015 at 1:57 PM, Burak Yavuz brk...@gmail.com wrote: Hmm. I have a lot of code on the local linear algebra operations using Spark's Matrix and Vector representations done for https://issues.apache.org/jira/browse/SPARK-6442. I can make a Spark package with that code if people are interested. Best, Burak On Tue, Aug 25, 2015 at 10:54 AM, Kristina Rogale Plazonic kpl...@gmail.com wrote: However I do think it's easier than it seems to write the implicits; it doesn't involve new classes or anything. Yes it's pretty much just what you wrote. There is a class Vector in Spark. This declaration can be in an object; you don't implement your own class. (Also you can use toBreeze to get Breeze vectors.) The implicit conversion with the implicit def happens for the first vector in the sum, but not the second vector (see below). At this point I give up, because I spent way too much time. I am so disappointed. So many times I heard Spark makes simple things easy and complicated things possible. Well, here is the simplest thing you can imagine in linear algebra, but heck, it is not easy or intuitive. It was easier to run a DeepLearning algo (from another library) than add two vectors. If anybody has a workaround other than implementing your own add/substract/scalarMultiply, PLEASE let me know. Here is the code and error from (freshly started) spark-shell: scala import breeze.linalg.{DenseVector = BDV, SparseVector = BSV, Vector = BV} import breeze.linalg.{DenseVector=BDV, SparseVector=BSV, Vector=BV} scala import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.Vectors scala val v1 = Vectors.dense(1.0, 2.0, 3.0) v1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0] scala import org.apache.spark.mllib.linalg.{Vector =SparkVector} import org.apache.spark.mllib.linalg.{Vector=SparkVector} scala object MyUtils { | implicit def toBreeze(v:SparkVector) = BV(v.toArray) | } warning: there were 1 feature warning(s); re-run with -feature for details defined module MyUtils scala import MyUtils._ import MyUtils._ scala v1:BV[Double] res2: breeze.linalg.Vector[Double] = DenseVector(1.0, 2.0, 3.0) scala v1 + v1 console:30: error: could not find implicit value for parameter op: breeze.linalg.operators.OpAdd.Impl2[breeze.linalg.Vector[Double],org.apache.spark.mllib.linalg.Vector,That] v1 + v1 ^
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
Does the first batch after restart contain all the messages received while the job was down? On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote: Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to unit test HiveContext without OutOfMemoryError (using sbt)
Hello, I am using sbt and created a unit test where I create a `HiveContext` and execute some query and then return. Each time I run the unit test the JVM will increase it's memory usage until I get the error: Internal error when running tests: java.lang.OutOfMemoryError: PermGen space Exception in thread Thread-2 java.io.EOFException As a work-around, I can fork a new JVM each time I run the unit test, however, it seems like a bad solution as takes a while to run the unit test. By the way, I tried to importing the TestHiveContext: - import org.apache.spark.sql.hive.test.TestHiveContext However, it suffers from the same memory issue. Has anyone else suffered from the same problem? Note that I am running these unit tests on my mac. Cheers, Mike.
DataFrame Parquet Writer doesn't keep schema
Hi all, when I read parquet files with required fields aka nullable=false they are read correctly. Then I save them (df.write.parquet) and read again all my fields are saved and read as optional, aka nullable=true. Which means I suddenly have files with incompatible schemas. This happens on 1.3.0-1.4.1 and even on 1.5.1-rc1. Should I set some write option to keep nullability? Is there a specific reason why nullability is always overriden to true? Many thanks, Peter
Re: Adding/subtracting org.apache.spark.mllib.linalg.Vector in Scala?
Yes, you're right that it's quite on purpose to leave this API to Breeze, in the main. As you can see the Spark objects have already sprouted a few basic operations anyway; there's a slippery slope problem here. Why not addition, why not dot products, why not determinants, etc. What about declaring a few simple implicit conversions between the MLlib and Breeze Vector classes? if you import them then you should be able to write a lot of the source code just as you imagine it, as if the Breeze methods were available on the Vector object in MLlib. On Tue, Aug 25, 2015 at 3:35 PM, Kristina Rogale Plazonic kpl...@gmail.com wrote: Well, yes, the hack below works (that's all I have time for), but is not satisfactory - it is not safe, and is verbose and very cumbersome to use, does not separately deal with SparseVector case and is not complete either. My question is, out of hundreds of users on this list, someone must have come up with a better solution - please? import breeze.linalg.{DenseVector = BDV, SparseVector = BSV, Vector = BV} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.{Vector =SparkVector} def toBreeze(v:SparkVector) = BV(v.toArray) def fromBreeze(bv:BV[Double]) = Vectors.dense(bv.toArray) def add(v1:SparkVector, v2:SparkVector) = fromBreeze( toBreeze(v1) + toBreeze(v2)) def subtract(v1:SparkVector, v2:SparkVector) = fromBreeze( toBreeze(v1) - toBreeze(v2)) def scalarMultiply(a:Double, v:SparkVector) = fromBreeze( a*toBreeze(v1) ) On Tue, Aug 25, 2015 at 9:41 AM, Sonal Goyal sonalgoy...@gmail.com wrote: From what I have understood, you probably need to convert your vector to breeze and do your operations there. Check stackoverflow.com/questions/28232829/addition-of-two-rddmllib-linalg-vectors On Aug 25, 2015 7:06 PM, Kristina Rogale Plazonic kpl...@gmail.com wrote: Hi all, I'm still not clear what is the best (or, ANY) way to add/subtract two org.apache.spark.mllib.Vector objects in Scala. Ok, I understand there was a conscious Spark decision not to support linear algebra operations in Scala and leave it to the user to choose a linear algebra library. But, for any newcomer from R or Python, where you don't think twice about adding two vectors, it is such a productivity shot in the foot to have to write your own + operation. I mean, there is support in Spark for p-norm of Vectors, for sqdist between two Vectors, but not for +/-? As I said, I'm a newcomer to linear algebra in Scala and am not familiar with Breeze or apache.commons - I am willing to learn, but would really benefit from guidance from more experienced users. I am also not used to optimizing low-level code and am sure that any hack I do will be just horrible. So, please, could somebody point me to a blog post, documentation, or just patches for this really basic functionality. What do you do to get around it? Am I the only one to have a problem? (And, would it really be so onerous to add +/- to Spark? After all, even org.apache.spark.sql.Column class does have +,-,*,/ ) My stupid little use case is to generate some toy data for Kmeans, and I need to translate a Gaussian blob to another center (for streaming and nonstreaming KMeans both). Many thanks! (I am REALLY embarassed to ask such a simple question...) Kristina - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-Ec2 launch failed on starting httpd spark 141
Corrected a typo in the subject of your email. What you cited seems to be from worker node startup. Was there other error you saw ? Please list the command you used. Cheers On Tue, Aug 25, 2015 at 7:39 AM, Garry Chen g...@cornell.edu wrote: Hi All, I am trying to lunch a spark cluster on ec2 with spark 1.4.1 version. The script finished but getting error at the end as following. What should I do to correct this issue. Thank you very much for your input. Starting httpd: httpd: Syntax error on line 199 of /etc/httpd/conf/httpd.conf: Cannot load modules/libphp-5.5.so into server: /etc/httpd/modules/libphp-5.5.so: cannot open shared object file: No such file or directory Garry
Spark RDD join with CassandraRDD
Hi All, I have the following scenario: There exists a booking table in cassandra, which holds the fields like, bookingid, passengeName, contact etc etc. Now in my spark streaming application, there is one class Booking which acts as a container and holds all the field details - class Booking { val bookingid =... val passengerName = ... val contact = ... . . . . } when a new booking message comes in I populate the fields in the class which create rdds of type RDD[Booking]. Now I have this rdd to cassandra table Booking as rdd.saveToCassandra. Lets say if I query on booking table I would get cassandraRDD[CassandraRow] If I want to join RDD[Booking] with this cassandraRDD...how is it possible...as these are of two different rdds ? converting CassandraRDD to RDD[CassandraRow] would make things work ? Thanks, Padma Ch
Spark-Ec2 lunch failed on starting httpd spark 141
Hi All, I am trying to lunch a spark cluster on ec2 with spark 1.4.1 version. The script finished but getting error at the end as following. What should I do to correct this issue. Thank you very much for your input. Starting httpd: httpd: Syntax error on line 199 of /etc/httpd/conf/httpd.conf: Cannot load modules/libphp-5.5.so into server: /etc/httpd/modules/libphp-5.5.so: cannot open shared object file: No such file or directory Garry
Re: How to access Spark UI through AWS
I'm not sure why the UI appears broken like that either and haven't investigated it myself yet, but if you instead go to the YARN ResourceManager UI (port 8088 if you are using emr-4.x; port 9026 for 3.x, I believe), then you should be able to click on the ApplicationMaster link (or the History link for completed applications) to get to the Spark UI from there. The ApplicationMaster link will use the YARN Proxy Service (port 20888 on emr-4.x; not sure about 3.x) to proxy through the Spark application's UI, regardless of what port it's running on. For completed applications, the History link will send you directly to the Spark History Server UI on port 18080. Hope that helps! ~ Jonathan On 8/24/15, 10:51 PM, Justin Pihony justin.pih...@gmail.com wrote: I am using the steps from this article https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923 to get spark up and running on EMR through yarn. Once up and running I ssh in and cd to the spark bin and run spark-shell --master yarn. Once this spins up I can see that the UI is started at the internal ip of 4040. If I hit the public dns at 4040 with dynamic port tunneling and foxyproxy then I get a crude UI (css seems broken), however the proxy continuously redirects me to the main page, so I cannot drill into anything. So, I tried static tunneling, but can't seem to get through. So, how can I access the spark UI when running a spark shell in AWS yarn? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-Spark-UI -through-AWS-tp24436.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark (1.2.0) submit fails with exception saying log directory already exists
Here is the error yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Log directory hdfs://Sandbox/user/spark/applicationHistory/application_1438113296105_0302 already exists!) I am using cloudera 5.3.2 with Spark 1.2.0 Any help is appreciated. ThanksJay
[SQL/Hive] Trouble with refreshTable
I'm having trouble with refreshTable, I suspect because I'm using it incorrectly. I am doing the following: 1. Create DF from parquet path with wildcards, e.g. /foo/bar/*.parquet 2. use registerTempTable to register my dataframe 3. A new file is dropped under /foo/bar/ 4. Call hiveContext.refreshTable in the hope that the paths for the Dataframe are re-evaluated Step 4 does not work as I imagine -- if I have 1 file in step 1, and 2 files in step 3, I still get the same count when I query the table So I have 2 questions 1). Is there a way to see the files that a Dataframe/RDD is underpinned by 2). What is a reasonable way to refresh the table with newcomer data -- I'm suspecting I have to start over from step 1 to force the Dataframe to re-see new files, but am hoping there is a simpler way (I know frames are immutable but they are also lazy so I'm thinking paths with wildcards evaluated per call might be possible?) Thanks for any insights.
Error:(46, 66) not found: type SparkFlumeProtocol
I'm trying to build Spark using Intellij on Windows. But I'm repeatedly getting this error spark-master\external\flume-sink\src\main\scala\org\apache\spark\streaming\flume\sink\SparkAvroCallbackHandler.scala Error:(46, 66) not found: type SparkFlumeProtocol val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging { ^ Error:(72, 39) not found: type EventBatch override def getEventBatch(n: Int): EventBatch = { ^ Error:(87, 13) not found: type EventBatch new EventBatch(Spark sink has been stopped!, , java.util.Collections.emptyList()) ^ I had the same error when using Linux, bit there I solved it by right clicking on the flume-sink - maven - generate sources and update folders. But on Windows, it doesn't seem to work. Any ideas? Thanks,
Re: Adding/subtracting org.apache.spark.mllib.linalg.Vector in Scala?
Yes I get all that too and I think there's a legit question about whether moving a little further down the slippery slope is worth it and if so how far. The other catch here is: either you completely mimic another API (in which case why not just use it directly, which has its own problems) or you don't, in which case you're introduce yet another API for the same operations. I personally would prefer not to go further down the slope, but it's not up to me. However I do think it's easier than it seems to write the implicits; it doesn't involve new classes or anything. Yes it's pretty much just what you wrote. There is a class Vector in Spark. This declaration can be in an object; you don't implement your own class. (Also you can use toBreeze to get Breeze vectors.) Then if you import these implicit defs it should work pretty transparently. I haven't tried it. If it works well, then *that* definition could be an interesting element to add to Spark for just this purpose. On Tue, Aug 25, 2015 at 4:57 PM, Kristina Rogale Plazonic kpl...@gmail.com wrote: PS: is this what you meant by adding simple implicit def? should it be a class or object? These are kinds of questions I grapple with and why I'm asking for example of a solution // this is really a pseudo-code, I know BreezeVector and SparkVector are not real class names class MyVector extends SparkVector { implicit def toBreeze(v:SparkVector):BreezeVector = BreezeVector(v.toArray) implicit def fromBreeze( bv:BreezeVector ):SparkVector = Vectors.dense( bv.toArray ) } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Adding/subtracting org.apache.spark.mllib.linalg.Vector in Scala?
Well, yes, the hack below works (that's all I have time for), but is not satisfactory - it is not safe, and is verbose and very cumbersome to use, does not separately deal with SparseVector case and is not complete either. My question is, out of hundreds of users on this list, someone must have come up with a better solution - please? import breeze.linalg.{DenseVector = BDV, SparseVector = BSV, Vector = BV} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.{Vector =SparkVector} def toBreeze(v:SparkVector) = BV(v.toArray) def fromBreeze(bv:BV[Double]) = Vectors.dense(bv.toArray) def add(v1:SparkVector, v2:SparkVector) = fromBreeze( toBreeze(v1) + toBreeze(v2)) def subtract(v1:SparkVector, v2:SparkVector) = fromBreeze( toBreeze(v1) - toBreeze(v2)) def scalarMultiply(a:Double, v:SparkVector) = fromBreeze( a*toBreeze(v1) ) On Tue, Aug 25, 2015 at 9:41 AM, Sonal Goyal sonalgoy...@gmail.com wrote: From what I have understood, you probably need to convert your vector to breeze and do your operations there. Check stackoverflow.com/questions/28232829/addition-of-two-rddmllib-linalg-vectors On Aug 25, 2015 7:06 PM, Kristina Rogale Plazonic kpl...@gmail.com wrote: Hi all, I'm still not clear what is the best (or, ANY) way to add/subtract two org.apache.spark.mllib.Vector objects in Scala. Ok, I understand there was a conscious Spark decision not to support linear algebra operations in Scala and leave it to the user to choose a linear algebra library. But, for any newcomer from R or Python, where you don't think twice about adding two vectors, it is such a productivity shot in the foot to have to write your own + operation. I mean, there is support in Spark for p-norm of Vectors, for sqdist between two Vectors, but not for +/-? As I said, I'm a newcomer to linear algebra in Scala and am not familiar with Breeze or apache.commons - I am willing to learn, but would really benefit from guidance from more experienced users. I am also not used to optimizing low-level code and am sure that any hack I do will be just horrible. So, please, could somebody point me to a blog post, documentation, or just patches for this really basic functionality. What do you do to get around it? Am I the only one to have a problem? (And, would it really be so onerous to add +/- to Spark? After all, even org.apache.spark.sql.Column class does have +,-,*,/ ) My stupid little use case is to generate some toy data for Kmeans, and I need to translate a Gaussian blob to another center (for streaming and nonstreaming KMeans both). Many thanks! (I am REALLY embarassed to ask such a simple question...) Kristina
Re: How to effieciently write sorted neighborhood in pyspark
Any resources on this On Aug 25, 2015, at 3:15 PM, shahid qadri shahidashr...@icloud.com wrote: I would like to implement sorted neighborhood approach in spark, what is the best way to write that in pyspark. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: CHAID Decision Trees
Hi Feynman, Thanks for the information. Is there a way to depict decision tree as a visualization for large amounts of data using any other technique/library? Thanks, Jatin On Tue, Aug 25, 2015 at 11:42 PM, Feynman Liang fli...@databricks.com wrote: Nothing is in JIRA https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22CHAID%22 so AFAIK no, only random forests and GBTs using entropy or GINI for information gain is supported. On Tue, Aug 25, 2015 at 9:39 AM, jatinpreet jatinpr...@gmail.com wrote: Hi, I wish to know if MLlib supports CHAID regression and classifcation trees. If yes, how can I build them in spark? Thanks, Jatin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CHAID-Decision-Trees-tp24449.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Jatinpreet Singh
Re: use GraphX with Spark Streaming
Hi, Sure you can. StreamingContext has property /def sparkContext: SparkContext/(see docs http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext ). Think about DStream - main abstraction in Spark Streaming, as a sequence of RDD. Each DStream can be transform as RDD with method transform(see docs http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.DStream ) . So you can use whatever you want depends on your problem. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/use-GraphX-with-Spark-Streaming-tp24418p24451.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Question on take function - Spark Java API
Hi community members, Apache Spark is Fantastic and very easy to learn.. Awesome work!!! Question: I have multiple files in a folder and and the first line in each file is name of the asset that the file belongs to. Second line is csv header row and data starts from third row.. Ex: File 1 TestAsset01 Time,dp_1,dp_2,dp_3 11-01-2015 15:00:00,123,456,789 11-01-2015 15:00:01,123,456,789 . . . Ex: File 2 TestAsset02 Time,dp_1,dp_2,dp_3 11-01-2015 15:00:00,1230,4560,7890 11-01-2015 15:00:01,1230,4560,7890 . . . I have got nearly 1000 files in each folder sizing ~10G I am using apache spark Java api to read all this files. Following is code extract that I am using: try (JavaSparkContext sc = new JavaSparkContext(conf)) { MapString, String readingTypeMap = getReadingTypesMap(sc); //Read File JavaRDDString data = sc.textFile(resourceBundle.getString(FOLDER_NAME)); //Get Asset String asset = data.take(1).get(0); //Extract Time Series Data JavaRDDString actualData = data.filter(line - line.contains(DELIMERTER)); //Strip header String header = actualData.take(1).get(0); String[] headers = header.split(DELIMERTER); //Extract actual data JavaRDDString timeSeriesLines = actualData.filter(line - !line.equals(header)); //Extract valid records JavaRDDString validated = timeSeriesLines.filter(line - validate(line)); //Find Granularity Integer granularity = toInt(resourceBundle.getString(GRANULARITY)); //Transform to TSD objects JavaRDDTimeSeriesData tsdFlatMap = transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity); //Save to Cassandra javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString(cassandra.tsd.keyspace), time_series_data, mapToRow(TimeSeriesData.class)).saveToCassandra(); System.out.println(Total Records: + timeSeriesLines.count()); System.out.println(Valid Records: + validated.count()); } Within TimeSeriesData Object I need to set the asset name for the reading, so I need output of data.take(1) to be different for different files. Thank You. Best Regards, Pankaj -- QIO Technologies Limited is a limited company registered in England Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number 09368431 This message and the information contained within it is intended solely for the addressee and may contain confidential or privileged information. If you have received this message in error please notify QIO Technologies Limited immediately and then permanently delete this message. If you are not the intended addressee then you must not copy, transmit, disclose or rely on the information contained in this message or in any attachment to it, all such use is prohibited to maximum extent possible by law.
Re: How to increase data scale in Spark SQL Perf
The error in #1 below was not informative. Are you able to get more detailed error message ? Thanks On Aug 25, 2015, at 6:57 PM, Todd bit1...@163.com wrote: Thanks Ted Yu. Following are the error message: 1. The exception that is shown on the UI is : Exception in thread Thread-113 Exception in thread Thread-126 Exception in thread Thread-64 Exception in thread Thread-90 Exception in thread Thread-117 Exception in thread Thread-80 Exception in thread Thread-115 Exception in thread ResponseProcessor for block BP-1564562096-172.18.149.132-1435294011279:blk_1073846767_105984 Exception in thread qtp1270119920-57 Exception in thread Thread-77 Exception in thread Thread-132 Exception in thread Thread-68 Exception in thread Thread-61 Exception in thread Thread-70 Exception in thread qtp1270119920-52 Exception in thread Thread-88 Exception in thread qtp318933312-47 Exception in thread qtp1270119920-56 2. jstack the process, I see bunch of following message: Thread 31258: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame) - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame) - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 (Interpreted frame) - scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp() @bci=11, line=142 (Interpreted frame) - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 (Interpreted frame) Thread 31257: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame) - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame) - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 (Interpreted frame) - scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp() @bci=11, line=142 (Interpreted frame) - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 (Interpreted frame) At 2015-08-25 19:32:56, Ted Yu yuzhih...@gmail.com wrote: Looks like you were attaching images to your email which didn't go through. Consider using third party site for images - or paste error in text. Cheers On Tue, Aug 25, 2015 at 4:22 AM, Todd bit1...@163.com wrote: Hi, The spark sql perf itself contains benchmark data generation. I am using spark shell to run the spark sql perf to generate the data with 10G memory for both driver and executor. When I increase the scalefactor to be 30,and run the job, Then I got the following error: When I jstack it to see the status of the thread. I see the following: looks it is waiting for the process that the spark job kicks off.
Re:Re: How to increase data scale in Spark SQL Perf
I think the answer is No. I only see such message on the console..and #2 is the thread stack trace。 I am thinking is that in Spark SQL Perf forks many dsdgen process to generate data when the scalafactor is increased which at last exhaust the JVM When thread exception is thrown on the console and I leave it there for some while(15min about),then eventually I will see OutOfMemory occur Can you guys try to run it if you have the environment ? I think you may reproduce it. Thanks! At 2015-08-26 13:01:34, Ted Yu yuzhih...@gmail.com wrote: The error in #1 below was not informative. Are you able to get more detailed error message ? Thanks On Aug 25, 2015, at 6:57 PM, Todd bit1...@163.com wrote: Thanks Ted Yu. Following are the error message: 1. The exception that is shown on the UI is : Exception in thread Thread-113 Exception in thread Thread-126 Exception in thread Thread-64 Exception in thread Thread-90 Exception in thread Thread-117 Exception in thread Thread-80 Exception in thread Thread-115 Exception in thread ResponseProcessor for block BP-1564562096-172.18.149.132-1435294011279:blk_1073846767_105984 Exception in thread qtp1270119920-57 Exception in thread Thread-77 Exception in thread Thread-132 Exception in thread Thread-68 Exception in thread Thread-61 Exception in thread Thread-70 Exception in thread qtp1270119920-52 Exception in thread Thread-88 Exception in thread qtp318933312-47 Exception in thread qtp1270119920-56 2. jstack the process, I see bunch of following message: Thread 31258: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame) - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame) - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 (Interpreted frame) - scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp() @bci=11, line=142 (Interpreted frame) - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 (Interpreted frame) Thread 31257: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame) - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame) - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 (Interpreted frame) - scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp() @bci=11, line=142 (Interpreted frame) - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 (Interpreted frame) At 2015-08-25 19:32:56, Ted Yu yuzhih...@gmail.com wrote: Looks like you were attaching images to your email which didn't go through. Consider using third party site for images - or paste error in text. Cheers On Tue, Aug 25, 2015 at 4:22 AM, Todd bit1...@163.com wrote: Hi, The spark sql perf itself contains benchmark data generation. I am using spark shell to run the spark sql perf to generate the data with 10G memory for both driver and executor. When I increase the scalefactor to be 30,and run the job, Then I got the following error: When I jstack it to see the status of the thread. I see the following: looks it is waiting for the process that the spark job kicks off.
Re: How to access Spark UI through AWS
I figured it all out after this: http://apache-spark-user-list.1001560.n3.nabble.com/WebUI-on-yarn-through-ssh-tunnel-affected-by-AmIpfilter-td21540.html The short is that I needed to set SPARK_PUBLIC_DNS (not DNS_HOME) = ec2_publicdns then the YARN proxy gets in the way, so I needed to go to: http://ec2_publicdns:20888/proxy/applicationid/jobs (9046 is the older emr port) or, as Jonathan said, the spark history server works once a job is completed. On Tue, Aug 25, 2015 at 5:26 PM, Justin Pihony justin.pih...@gmail.com wrote: OK, I figured the horrid look alsothe href of all of the styles is prefixed with the proxy dataso, ultimately if I can fix the proxy issues with the links, then I can fix the look also On Tue, Aug 25, 2015 at 5:17 PM, Justin Pihony justin.pih...@gmail.com wrote: SUCCESS! I set SPARK_DNS_HOME=ec2_publicdns, which makes it available to access the spark ui directly. The application proxy was still getting in the way by the way it creates the URL, so I manually filled in the /stage?id=#attempt=# and that workedI'm still having trouble with the css as the UI looks horridbut I'll tackle that next :) On Tue, Aug 25, 2015 at 4:31 PM, Justin Pihony justin.pih...@gmail.com wrote: Thanks. I just tried and still am having trouble. It seems to still be using the private address even if I try going through the resource manager. On Tue, Aug 25, 2015 at 12:34 PM, Kelly, Jonathan jonat...@amazon.com wrote: I'm not sure why the UI appears broken like that either and haven't investigated it myself yet, but if you instead go to the YARN ResourceManager UI (port 8088 if you are using emr-4.x; port 9026 for 3.x, I believe), then you should be able to click on the ApplicationMaster link (or the History link for completed applications) to get to the Spark UI from there. The ApplicationMaster link will use the YARN Proxy Service (port 20888 on emr-4.x; not sure about 3.x) to proxy through the Spark application's UI, regardless of what port it's running on. For completed applications, the History link will send you directly to the Spark History Server UI on port 18080. Hope that helps! ~ Jonathan On 8/24/15, 10:51 PM, Justin Pihony justin.pih...@gmail.com wrote: I am using the steps from this article https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923 to get spark up and running on EMR through yarn. Once up and running I ssh in and cd to the spark bin and run spark-shell --master yarn. Once this spins up I can see that the UI is started at the internal ip of 4040. If I hit the public dns at 4040 with dynamic port tunneling and foxyproxy then I get a crude UI (css seems broken), however the proxy continuously redirects me to the main page, so I cannot drill into anything. So, I tried static tunneling, but can't seem to get through. So, how can I access the spark UI when running a spark shell in AWS yarn? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-Spark-UI -through-AWS-tp24436.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: CHAID Decision Trees
For a single decision tree, the closest I can think of is printDebugString, which gives you a text representation of the decision thresholds and paths down the tree. I don't think there's anything in MLlib for visualizing GBTs or random forests On Tue, Aug 25, 2015 at 9:20 PM, Jatinpreet Singh jatinpr...@gmail.com wrote: Hi Feynman, Thanks for the information. Is there a way to depict decision tree as a visualization for large amounts of data using any other technique/library? Thanks, Jatin On Tue, Aug 25, 2015 at 11:42 PM, Feynman Liang fli...@databricks.com wrote: Nothing is in JIRA https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22CHAID%22 so AFAIK no, only random forests and GBTs using entropy or GINI for information gain is supported. On Tue, Aug 25, 2015 at 9:39 AM, jatinpreet jatinpr...@gmail.com wrote: Hi, I wish to know if MLlib supports CHAID regression and classifcation trees. If yes, how can I build them in spark? Thanks, Jatin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CHAID-Decision-Trees-tp24449.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Jatinpreet Singh
Re: Exception throws when running spark pi in Intellij Idea that scala.collection.Seq is not found
Go to the module settings of the project and in the dependencies section check the scope of scala jars. It would be either Test or Provided. Change it to compile and it should work. Check the following link to understand more about scope of modules: https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html On Tue, Aug 25, 2015 at 12:18 PM, Todd bit1...@163.com wrote: I cloned the code from https://github.com/apache/spark to my machine. It can compile successfully, But when I run the sparkpi, it throws an exception below complaining the scala.collection.Seq is not found. I have installed scala2.10.4 in my machine, and use the default profiles: window,scala2.10,maven-3,test-java-home. In Idea, I can find that the Seq class is on my classpath: Exception in thread main java.lang.NoClassDefFoundError: scala/collection/Seq at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.ClassNotFoundException: scala.collection.Seq at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 6 more
Re: Exception throws when running spark pi in Intellij Idea that scala.collection.Seq is not found
As I remember, you also need to change guava and jetty related dependency to compile if you run to run SparkPi in intellij. On Tue, Aug 25, 2015 at 3:15 PM, Hemant Bhanawat hemant9...@gmail.com wrote: Go to the module settings of the project and in the dependencies section check the scope of scala jars. It would be either Test or Provided. Change it to compile and it should work. Check the following link to understand more about scope of modules: https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html On Tue, Aug 25, 2015 at 12:18 PM, Todd bit1...@163.com wrote: I cloned the code from https://github.com/apache/spark to my machine. It can compile successfully, But when I run the sparkpi, it throws an exception below complaining the scala.collection.Seq is not found. I have installed scala2.10.4 in my machine, and use the default profiles: window,scala2.10,maven-3,test-java-home. In Idea, I can find that the Seq class is on my classpath: Exception in thread main java.lang.NoClassDefFoundError: scala/collection/Seq at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.ClassNotFoundException: scala.collection.Seq at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 6 more -- Best Regards Jeff Zhang
Re: Spark stages very slow to complete
I have pretty much the same symptoms - the computation itself is pretty fast, but most of my computation is spent in JavaToPython steps (~15min). I'm using the Spark 1.5.0-rc1 with DataFrame and ML Pipelines. Any insights into what these steps are exactly ? 2015-06-02 9:18 GMT+02:00 Karlson ksonsp...@siberie.de: Hi, the code is some hundreds lines of Python. I can try to compose a minimal example as soon as I find the time, though. Any ideas until then? Would you mind posting the code? On 2 Jun 2015 00:53, Karlson ksonsp...@siberie.de wrote: Hi, In all (pyspark) Spark jobs, that become somewhat more involved, I am experiencing the issue that some stages take a very long time to complete and sometimes don't at all. This clearly correlates with the size of my input data. Looking at the stage details for one such stage, I am wondering where Spark spends all this time. Take this table of the stages task metrics for example: Metric Min 25th percentile Median 75th percentile Max Duration1.4 min 1.5 min 1.7 min 1.9 min 2.3 min Scheduler Delay 1 ms3 ms4 ms 5 ms23 ms Task Deserialization Time 1 ms2 ms3 ms 8 ms22 ms GC Time 0 ms0 ms0 ms 0 ms0 ms Result Serialization Time 0 ms0 ms0 ms 0 ms1 ms Getting Result Time 0 ms0 ms0 ms 0 ms0 ms Input Size / Records23.9 KB / 1 24.0 KB / 1 24.1 KB / 1 24.1 KB / 1 24.3 KB / 1 Why is the overall duration almost 2min? Where is all this time spent, when no progress of the stages is visible? The progress bar simply displays 0 succeeded tasks for a very long time before sometimes slowly progressing. Also, the name of the stage displayed above is `javaToPython at null:-1`, which I find very uninformative. I don't even know which action exactly is responsible for this stage. Does anyone experience similar issues or have any advice for me? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
RE: DataFrame#show cost 2 Spark Jobs ?
Ok, I see, thanks for the correction, but this should be optimized. From: Shixiong Zhu [mailto:zsxw...@gmail.com] Sent: Tuesday, August 25, 2015 2:08 PM To: Cheng, Hao Cc: Jeff Zhang; user@spark.apache.org Subject: Re: DataFrame#show cost 2 Spark Jobs ? That's two jobs. `SparkPlan.executeTake` will call `runJob` twice in this case. Best Regards, Shixiong Zhu 2015-08-25 14:01 GMT+08:00 Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com: O, Sorry, I miss reading your reply! I know the minimum tasks will be 2 for scanning, but Jeff is talking about 2 jobs, not 2 tasks. From: Shixiong Zhu [mailto:zsxw...@gmail.commailto:zsxw...@gmail.com] Sent: Tuesday, August 25, 2015 1:29 PM To: Cheng, Hao Cc: Jeff Zhang; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: DataFrame#show cost 2 Spark Jobs ? Hao, I can reproduce it using the master branch. I'm curious why you cannot reproduce it. Did you check if the input HadoopRDD did have two partitions? My test code is val df = sqlContext.read.json(examples/src/main/resources/people.json) df.show() Best Regards, Shixiong Zhu 2015-08-25 13:01 GMT+08:00 Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com: Hi Jeff, which version are you using? I couldn’t reproduce the 2 spark jobs in the `df.show()` with latest code, we did refactor the code for json data source recently, not sure you’re running an earlier version of it. And a known issue is Spark SQL will try to re-list the files every time when loading the data for JSON, it’s probably causes longer time for ramp up with large number of files/partitions. From: Jeff Zhang [mailto:zjf...@gmail.commailto:zjf...@gmail.com] Sent: Tuesday, August 25, 2015 8:11 AM To: Cheng, Hao Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: DataFrame#show cost 2 Spark Jobs ? Hi Cheng, I know that sqlContext.read will trigger one spark job to infer the schema. What I mean is DataFrame#show cost 2 spark jobs. So overall it would cost 3 jobs. Here's the command I use: val df = sqlContext.read.json(file:///Users/hadoop/github/spark/examples/src/main/resources/people.jsonfile:///\\Users\hadoop\github\spark\examples\src\main\resources\people.json) // trigger one spark job to infer schema df.show()// trigger 2 spark jobs which is weird On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com wrote: The first job is to infer the json schema, and the second one is what you mean of the query. You can provide the schema while loading the json file, like below: sqlContext.read.schema(xxx).json(“…”)? Hao From: Jeff Zhang [mailto:zjf...@gmail.commailto:zjf...@gmail.com] Sent: Monday, August 24, 2015 6:20 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: DataFrame#show cost 2 Spark Jobs ? It's weird to me that the simple show function will cost 2 spark jobs. DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs. == Parsed Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Analyzed Logical Plan == age: bigint, name: string Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Optimized Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Physical Plan == Scan JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1] -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re:Re: Exception throws when running spark pi in Intellij Idea that scala.collection.Seq is not found
Thanks you guys. Yes, I have fixed the guava and spark core and scala and jetty. And I can run Pi now. At 2015-08-25 15:28:51, Jeff Zhang zjf...@gmail.com wrote: As I remember, you also need to change guava and jetty related dependency to compile if you run to run SparkPi in intellij. On Tue, Aug 25, 2015 at 3:15 PM, Hemant Bhanawat hemant9...@gmail.com wrote: Go to the module settings of the project and in the dependencies section check the scope of scala jars. It would be either Test or Provided. Change it to compile and it should work. Check the following link to understand more about scope of modules: https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html On Tue, Aug 25, 2015 at 12:18 PM, Todd bit1...@163.com wrote: I cloned the code from https://github.com/apache/spark to my machine. It can compile successfully, But when I run the sparkpi, it throws an exception below complaining the scala.collection.Seq is not found. I have installed scala2.10.4 in my machine, and use the default profiles: window,scala2.10,maven-3,test-java-home. In Idea, I can find that the Seq class is on my classpath: Exception in thread main java.lang.NoClassDefFoundError: scala/collection/Seq at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.ClassNotFoundException: scala.collection.Seq at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 6 more -- Best Regards Jeff Zhang
Re: Loading already existing tables in spark shell
In spark shell use database not working saying use not found in the shell? did you ran this with scala shell ? On 24 August 2015 at 18:26, Ishwardeep Singh ishwardeep.si...@impetus.co.in wrote: Hi Jeetendra, I faced this issue. I did not specify the database where this table exists. Please set the database by using use database command before executing the query. Regards, Ishwardeep -- *From:* Jeetendra Gangele gangele...@gmail.com *Sent:* Monday, August 24, 2015 5:47 PM *To:* user *Subject:* Loading already existing tables in spark shell Hi All I have few tables in hive and I wanted to run query against them with spark as execution engine. Can I direct;y load these tables in spark shell and run query? I tried with 1.val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 2.qlContext.sql(FROM event_impressions select count(*)) where event_impressions is the table name. It give me error saying org.apache.spark.sql.AnalysisException: no such table event_impressions; line 1 pos 5 Does anybody hit similar issues? regards jeetendra -- NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the communication is free of errors, virus, interception or interference.
Invalid environment variable name when submitting job from windows
Hi, We have a spark standalone cluster running on linux. We have a job that we submit to the spark cluster on windows. When submitting this job using windows the execution failed with this error in the Notes java.lang.IllegalArgumentException: Invalid environment variable name: =::. When submitting from linux it works fine. I thought that this might be the result of one of the ENV variable on my system so I've modify the submit cmd to remove all env variable except the one needed by Java. This is the env before executing java command : ASSEMBLY_DIR=c:\spark\spark-1.4.0-bin-hadoop2.6\bin\..\lib ASSEMBLY_DIR1=c:\spark\spark-1.4.0-bin-hadoop2.6\bin\../assembly/target/scala-2.10 ASSEMBLY_DIR2=c:\spark\spark-1.4.0-bin-hadoop2.6\bin\../assembly/target/scala-2.11 CLASS=org.apache.spark.deploy.SparkSubmit CLASSPATH=.; JAVA_HOME=C:\Program Files\Java\jre1.8.0_51 LAUNCHER_OUTPUT=\spark-class-launcher-output-23386.txt LAUNCH_CLASSPATH=c:\spark\spark-1.4.0-bin-hadoop2.6\bin\..\lib\spark-assembly-1.4.0-hadoop2.6.0.jar PYTHONHASHSEED=0 RUNNER=C:\Program Files\Java\jre1.8.0_51\bin\java SPARK_ASSEMBLY_JAR=c:\spark\spark-1.4.0-bin-hadoop2.6\bin\..\lib\spark-assembly-1.4.0-hadoop2.6.0.jar SPARK_CMD=C:\Program Files\Java\jre1.8.0_51\bin\java -cp c:\spark\spark-1.4.0-bin-hadoop2.6\bin\..\conf\;c:\spark\spark-1.4.0-bin-hadoop2.6\bin\..\lib\spark-assembly-1.4.0-hadoop2.6.0.jar;c:\spark\spark-1.4.0-bin-hadoop2.6\bin\..\lib\datanucleus-api-jdo-3.2.6.jar;c:\spark\spark-1.4.0-bin-hadoop2.6\bin\..\lib\datanucleus-core-3.2.10.jar;c:\spark\spark-1.4.0-bin-hadoop2.6\bin\..\lib\datanucleus-rdbms-3.2.9.jar org.apache.spark.deploy.SparkSubmit --master spark://172.16.8.21:7077 --deploy-mode cluster --conf spark.driver.memory=4G --conf spark.driver.extraClassPath=/opt/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar --class com.publica.Accounts --verbose http://server/data-analytics/data-analytics.jar spark://172.16.8.21:7077 data-analysis http://server/data-analytics/data-analytics.jar 23 8 2015 SPARK_ENV_LOADED=1 SPARK_HOME=c:\spark\spark-1.4.0-bin-hadoop2.6\bin\.. SPARK_SCALA_VERSION=2.10 SystemRoot=C:\Windows user_conf_dir=c:\spark\spark-1.4.0-bin-hadoop2.6\bin\..\..\conf _SPARK_ASSEMBLY=c:\spark\spark-1.4.0-bin-hadoop2.6\bin\..\lib\spark-assembly-1.4.0-hadoop2.6.0.jar Is there a way to make this works ? -- Yann - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What does Attribute and AttributeReference mean in Spark SQL
Attribute is the Catalyst name for an input column from a child operator. An AttributeReference has been resolved, meaning we know which input column in particular it is referring too. An AttributeReference also has a known DataType. In contrast, before analysis there might still exist UnresolvedReferences, which are just string identifiers from a parsed query. An Expression can be more complex (like you suggested, a + b), though technically just a is also a very simple Expression. The following console session shows how these types are composed: $ build/sbt sql/console import org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContextimport org.apache.spark.sql.catalyst.analysis._import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.dsl.expressions._import org.apache.spark.sql.catalyst.dsl.plans._ sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@5adfe37d sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@20d05227import sqlContext.implicits._import sqlContext._Welcome to Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_45).Type in expressions to have them evaluated.Type :help for more information. scala val unresolvedAttr: UnresolvedAttribute = 'a unresolvedAttr: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute = 'a scala val relation = LocalRelation('a.int) relation: org.apache.spark.sql.catalyst.plans.logical.LocalRelation = LocalRelation [a#0] scala val parsedQuery = relation.select(unresolvedAttr) parsedQuery: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 'Project ['a] LocalRelation [a#0] scala parsedQuery.analyze res11: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [a#0] LocalRelation [a#0] The #0 after a is a unique identifier (within this JVM) that says where the data is coming from, even as plans are rearranged due to optimizations. On Mon, Aug 24, 2015 at 6:13 PM, Todd bit1...@163.com wrote: There are many such kind of case class or concept such as Attribute/AttributeReference/Expression in Spark SQL I would ask what Attribute/AttributeReference/Expression mean, given a sql query like select a,b from c, it a, b are two Attributes? a + b is an expression? Looks I misunderstand it because Attribute is extending Expression in the code,which means Attribute itself is an Expression. Thanks.
Re: Local Spark talking to remote HDFS?
Port 8020 is not the only port you need tunnelled for HDFS to work. If you only list the contents of a directory, port 8020 is enough... for instance, using something val p = new org.apache.hadoop.fs.Path(hdfs://localhost:8020/) val fs = p.getFileSystem(sc.hadoopConfiguration) fs.listStatus(p) you should see the file list. But then, when accessing a file, you need to actually get its blocks, it has to connect to the data node. The error 'could not obtain block' means it can't get that block from the DataNode. Refer to http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-1.2.1/bk_reference/content/reference_chap2_1.html to see the complete list of ports that also need to be tunnelled. 2015-08-24 13:10 GMT-07:00 Dino Fancellu d...@felstar.com: Changing the ip to the guest IP address just never connects. The VM has port tunnelling, and it passes through all the main ports, 8020 included to the host VM. You can tell that it was talking to the guest VM before, simply because it said when file not found Error is: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-452094660-10.0.2.15-1437494483194:blk_1073742905_2098 file=/tmp/people.txt but I have no idea what it means by that. It certainly can find the file and knows it exists. On 24 August 2015 at 20:43, Roberto Congiu roberto.con...@gmail.com wrote: When you launch your HDP guest VM, most likely it gets launched with NAT and an address on a private network (192.168.x.x) so on your windows host you should use that address (you can find out using ifconfig on the guest OS). I usually add an entry to my /etc/hosts for VMs that I use oftenif you use vagrant, there's also a vagrant module that can do that automatically. Also, I am not sure how the default HDP VM is set up, that is, if it only binds HDFS to 127.0.0.1 or to all addresses. You can check that with netstat -a. R. 2015-08-24 11:46 GMT-07:00 Dino Fancellu d...@felstar.com: I have a file in HDFS inside my HortonWorks HDP 2.3_1 VirtualBox VM. If I go into the guest spark-shell and refer to the file thus, it works fine val words=sc.textFile(hdfs:///tmp/people.txt) words.count However if I try to access it from a local Spark app on my Windows host, it doesn't work val conf = new SparkConf().setMaster(local).setAppName(My App) val sc = new SparkContext(conf) val words=sc.textFile(hdfs://localhost:8020/tmp/people.txt) words.count Emits The port 8020 is open, and if I choose the wrong file name, it will tell me My pom has dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.11/artifactId version1.4.1/version scopeprovided/scope /dependency Am I doing something wrong? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Local-Spark-talking-to-remote-HDFS-tp24425.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to set environment of worker applications
Ok, I went in the direction of system vars since beginning probably because the question was to pass variables to a particular job. Anyway, the decision to use either system vars or environment vars would solely depend on whether you want to make them available to all the spark processes on a node or to a particular job. Are there any other reasons why one would prefer one over the other? On Mon, Aug 24, 2015 at 8:48 PM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: System properties and environment variables are two different things.. One can use spark.executor.extraJavaOptions to pass system properties and spark-env.sh to pass environment variables. -raghav On Mon, Aug 24, 2015 at 1:00 PM, Hemant Bhanawat hemant9...@gmail.com wrote: That's surprising. Passing the environment variables using spark.executor.extraJavaOptions=-Dmyenvvar=xxx to the executor and then fetching them using System.getProperty(myenvvar) has worked for me. What is the error that you guys got? On Mon, Aug 24, 2015 at 12:10 AM, Sathish Kumaran Vairavelu vsathishkuma...@gmail.com wrote: spark-env.sh works for me in Spark 1.4 but not spark.executor.extraJavaOptions. On Sun, Aug 23, 2015 at 11:27 AM Raghavendra Pandey raghavendra.pan...@gmail.com wrote: I think the only way to pass on environment variables to worker node is to write it in spark-env.sh file on each worker node. On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat hemant9...@gmail.com wrote: Check for spark.driver.extraJavaOptions and spark.executor.extraJavaOptions in the following article. I think you can use -D to pass system vars: spark.apache.org/docs/latest/configuration.html#runtime-environment Hi, I am starting a spark streaming job in standalone mode with spark-submit. Is there a way to make the UNIX environment variables with which spark-submit is started available to the processes started on the worker nodes? Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Loading already existing tables in spark shell
Hi Jeetendra, Please try the following in spark shell. it is like executing an sql command. sqlContext.sql(use database name) Regards, Ishwardeep From: Jeetendra Gangele gangele...@gmail.com Sent: Tuesday, August 25, 2015 12:57 PM To: Ishwardeep Singh Cc: user Subject: Re: Loading already existing tables in spark shell In spark shell use database not working saying use not found in the shell? did you ran this with scala shell ? On 24 August 2015 at 18:26, Ishwardeep Singh ishwardeep.si...@impetus.co.inmailto:ishwardeep.si...@impetus.co.in wrote: Hi Jeetendra, I faced this issue. I did not specify the database where this table exists. Please set the database by using use database command before executing the query. Regards, Ishwardeep From: Jeetendra Gangele gangele...@gmail.commailto:gangele...@gmail.com Sent: Monday, August 24, 2015 5:47 PM To: user Subject: Loading already existing tables in spark shell Hi All I have few tables in hive and I wanted to run query against them with spark as execution engine. Can I direct;y load these tables in spark shell and run query? I tried with 1.val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 2.qlContext.sql(FROM event_impressions select count(*)) where event_impressions is the table name. It give me error saying org.apache.spark.sql.AnalysisException: no such table event_impressions; line 1 pos 5 Does anybody hit similar issues? regards jeetendra NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the communication is free of errors, virus, interception or interference. NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the communication is free of errors, virus, interception or interference.
Re: spark not launching in yarn-cluster mode
spark-shell and spark-sql can not be deployed with yarn-cluster mode, because you need to make spark-shell or spark-sql scripts run on your local machine rather than container of YARN cluster. 2015-08-25 16:19 GMT+08:00 Jeetendra Gangele gangele...@gmail.com: Hi All i am trying to launch the spark shell with --master yarn-cluster its giving below error. why this is not supported? bin/spark-sql --master yarn-cluster Error: Cluster deploy mode is not applicable to Spark SQL shell. Run with --help for usage help or --verbose for debug output Regards Jeetendra
Re: org.apache.spark.shuffle.FetchFailedException
I have set spark.sql.shuffle.partitions=1000 then also its failing. On Tue, Aug 25, 2015 at 11:36 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: Did you try increasing sql partitions? On Tue, Aug 25, 2015 at 11:06 AM, kundan kumar iitr.kun...@gmail.com wrote: I am running this query on a data size of 4 billion rows and getting org.apache.spark.shuffle.FetchFailedException error. select adid,position,userid,price from ( select adid,position,userid,price, dense_rank() OVER (PARTITION BY adlocationid ORDER BY price DESC) as rank FROM trainInfo) as tmp WHERE rank = 2 I have attached the error logs from spark-sql terminal. Please suggest what is the reason for these kind of errors and how can I resolve them. Regards, Kundan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re:RE: Test case for the spark sql catalyst
Thanks Chenghao! At 2015-08-25 13:06:40, Cheng, Hao hao.ch...@intel.com wrote: Yes, check the source code under:https://github.com/apache/spark/tree/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst From: Todd [mailto:bit1...@163.com] Sent: Tuesday, August 25, 2015 1:01 PM To:user@spark.apache.org Subject: Test case for the spark sql catalyst Hi, Are there test cases for the spark sql catalyst, such as testing the rules of transforming unsolved query plan? Thanks!
Exception throws when running spark pi in Intellij Idea that scala.collection.Seq is not found
I cloned the code from https://github.com/apache/spark to my machine. It can compile successfully, But when I run the sparkpi, it throws an exception below complaining the scala.collection.Seq is not found. I have installed scala2.10.4 in my machine, and use the default profiles: window,scala2.10,maven-3,test-java-home. In Idea, I can find that the Seq class is on my classpath: Exception in thread main java.lang.NoClassDefFoundError: scala/collection/Seq at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.ClassNotFoundException: scala.collection.Seq at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 6 more
RE: DataFrame#show cost 2 Spark Jobs ?
O, Sorry, I miss reading your reply! I know the minimum tasks will be 2 for scanning, but Jeff is talking about 2 jobs, not 2 tasks. From: Shixiong Zhu [mailto:zsxw...@gmail.com] Sent: Tuesday, August 25, 2015 1:29 PM To: Cheng, Hao Cc: Jeff Zhang; user@spark.apache.org Subject: Re: DataFrame#show cost 2 Spark Jobs ? Hao, I can reproduce it using the master branch. I'm curious why you cannot reproduce it. Did you check if the input HadoopRDD did have two partitions? My test code is val df = sqlContext.read.json(examples/src/main/resources/people.json) df.show() Best Regards, Shixiong Zhu 2015-08-25 13:01 GMT+08:00 Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com: Hi Jeff, which version are you using? I couldn’t reproduce the 2 spark jobs in the `df.show()` with latest code, we did refactor the code for json data source recently, not sure you’re running an earlier version of it. And a known issue is Spark SQL will try to re-list the files every time when loading the data for JSON, it’s probably causes longer time for ramp up with large number of files/partitions. From: Jeff Zhang [mailto:zjf...@gmail.commailto:zjf...@gmail.com] Sent: Tuesday, August 25, 2015 8:11 AM To: Cheng, Hao Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: DataFrame#show cost 2 Spark Jobs ? Hi Cheng, I know that sqlContext.read will trigger one spark job to infer the schema. What I mean is DataFrame#show cost 2 spark jobs. So overall it would cost 3 jobs. Here's the command I use: val df = sqlContext.read.json(file:///Users/hadoop/github/spark/examples/src/main/resources/people.jsonfile:///\\Users\hadoop\github\spark\examples\src\main\resources\people.json) // trigger one spark job to infer schema df.show()// trigger 2 spark jobs which is weird On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com wrote: The first job is to infer the json schema, and the second one is what you mean of the query. You can provide the schema while loading the json file, like below: sqlContext.read.schema(xxx).json(“…”)? Hao From: Jeff Zhang [mailto:zjf...@gmail.commailto:zjf...@gmail.com] Sent: Monday, August 24, 2015 6:20 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: DataFrame#show cost 2 Spark Jobs ? It's weird to me that the simple show function will cost 2 spark jobs. DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs. == Parsed Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Analyzed Logical Plan == age: bigint, name: string Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Optimized Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Physical Plan == Scan JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1] -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re: org.apache.spark.shuffle.FetchFailedException
Did you try increasing sql partitions? On Tue, Aug 25, 2015 at 11:06 AM, kundan kumar iitr.kun...@gmail.com wrote: I am running this query on a data size of 4 billion rows and getting org.apache.spark.shuffle.FetchFailedException error. select adid,position,userid,price from ( select adid,position,userid,price, dense_rank() OVER (PARTITION BY adlocationid ORDER BY price DESC) as rank FROM trainInfo) as tmp WHERE rank = 2 I have attached the error logs from spark-sql terminal. Please suggest what is the reason for these kind of errors and how can I resolve them. Regards, Kundan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark
Sorry am I missing something? There is a method sortBy on both RDD and PairRDD. def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html#partitions:Array[org.apache.spark.Partition] )(implicitord: Ordering[K], ctag: ClassTag[K]): RDD http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html [T] Return this RDD sorted by the given key function. Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co Check out Reifier at Spark Summit 2015 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/ http://in.linkedin.com/in/sonalgoyal On Tue, Aug 25, 2015 at 12:08 PM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: But, there is no sort() primitive for an RDD. How do I sort? On Tuesday, 25 August 2015 11:10 AM, Sonal Goyal sonalgoy...@gmail.com wrote: I think you could try sorting the endPointsCount and then doing a take. This should be a distributed process and only the result would get returned to the driver. Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co/ Check out Reifier at Spark Summit 2015 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/ http://in.linkedin.com/in/sonalgoyal On Tue, Aug 25, 2015 at 10:22 AM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: I was running a Spark Job to crunch a 9GB apache log file When I saw the following error: 15/08/25 04:25:16 WARN scheduler.TaskSetManager: Lost task 99.0 in stage 37.0 (TID 4115, ip-10-150-137-100.ap-southeast-1.compute.internal): ExecutorLostFailure (executor 29 lost) 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 40), so marking it as still running 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 86), so marking it as still running 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 84), so marking it as still running 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 22), so marking it as still running 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 48), so marking it as still running 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 12), so marking it as still running 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Executor lost: 29 (epoch 59) 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Trying to remove executor 29 from BlockManagerMaster. 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(29, ip-10-150-137-100.ap-southeast-1.compute.internal, 39411) . . Encountered Exception An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346) at org.apache.spark.SparkContext.stop(SparkContext.scala:1380) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:143) . . Looking further, it seems like takeOrdered (called by my application) uses collect() internally and hence drains out all the Drive memory. line 361, in top10EndPoints topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1]) File /home/hadoop/spark/python/pyspark/rdd.py, line 1174, in takeOrdered return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, key)]).reduce(merge) File /home/hadoop/spark/python/pyspark/rdd.py, line 739, in reduce vals = self.mapPartitions(func).collect() File /home/hadoop/spark/python/pyspark/rdd.py, line 713, in collect port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ self.target_id, self.name) File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value format(target_id, '.', name), value) How can I rewrite this code endpointCounts = (access_logs .map(lambda log: (log.endpoint, 1)) .reduceByKey(lambda a, b :
Re: DataFrame#show cost 2 Spark Jobs ?
That's two jobs. `SparkPlan.executeTake` will call `runJob` twice in this case. Best Regards, Shixiong Zhu 2015-08-25 14:01 GMT+08:00 Cheng, Hao hao.ch...@intel.com: O, Sorry, I miss reading your reply! I know the minimum tasks will be 2 for scanning, but Jeff is talking about 2 jobs, not 2 tasks. *From:* Shixiong Zhu [mailto:zsxw...@gmail.com] *Sent:* Tuesday, August 25, 2015 1:29 PM *To:* Cheng, Hao *Cc:* Jeff Zhang; user@spark.apache.org *Subject:* Re: DataFrame#show cost 2 Spark Jobs ? Hao, I can reproduce it using the master branch. I'm curious why you cannot reproduce it. Did you check if the input HadoopRDD did have two partitions? My test code is val df = sqlContext.read.json(examples/src/main/resources/people.json) df.show() Best Regards, Shixiong Zhu 2015-08-25 13:01 GMT+08:00 Cheng, Hao hao.ch...@intel.com: Hi Jeff, which version are you using? I couldn’t reproduce the 2 spark jobs in the `df.show()` with latest code, we did refactor the code for json data source recently, not sure you’re running an earlier version of it. And a known issue is Spark SQL will try to re-list the files every time when loading the data for JSON, it’s probably causes longer time for ramp up with large number of files/partitions. *From:* Jeff Zhang [mailto:zjf...@gmail.com] *Sent:* Tuesday, August 25, 2015 8:11 AM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: DataFrame#show cost 2 Spark Jobs ? Hi Cheng, I know that sqlContext.read will trigger one spark job to infer the schema. What I mean is DataFrame#show cost 2 spark jobs. So overall it would cost 3 jobs. Here's the command I use: val df = sqlContext.read.json( file:///Users/hadoop/github/spark/examples/src/main/resources/people.json) // trigger one spark job to infer schema df.show()// trigger 2 spark jobs which is weird On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao hao.ch...@intel.com wrote: The first job is to infer the json schema, and the second one is what you mean of the query. You can provide the schema while loading the json file, like below: sqlContext.read.schema(xxx).json(“…”)? Hao *From:* Jeff Zhang [mailto:zjf...@gmail.com] *Sent:* Monday, August 24, 2015 6:20 PM *To:* user@spark.apache.org *Subject:* DataFrame#show cost 2 Spark Jobs ? It's weird to me that the simple show function will cost 2 spark jobs. DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs. == Parsed Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Analyzed Logical Plan == age: bigint, name: string Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Optimized Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Physical Plan == Scan JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1] -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re: How can I save the RDD result as Orcfile with spark1.3?
We plan to upgrade our spark cluster to 1.4, and I just have a test in local mode which reference here: http://hortonworks.com/blog/bringing-orc-support-into-apache-spark/ but an exception caused when running the example, the stack trace as below: *Exception in thread main java.lang.NoSuchFieldError: defaultVal* at org.apache.spark.sql.hive.HiveContext$$anonfun$newTemporaryConfiguration$1.apply(HiveContext.scala:536) at org.apache.spark.sql.hive.HiveContext$$anonfun$newTemporaryConfiguration$1.apply(HiveContext.scala:534) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at org.apache.spark.sql.hive.HiveContext$.newTemporaryConfiguration(HiveContext.scala:534) at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:165) at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:161) at org.apache.spark.sql.hive.HiveContext.init(HiveContext.scala:168) *at com.newegg.ec.bigdata.ORCSpark$.main(ORCSpark.scala:24)* at com.newegg.ec.bigdata.ORCSpark.main(ORCSpark.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) the code of the 24th line was: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) I use the Spark core with 1.4.1 and Hive with 1.1.0-cdh5.4.0 On Sat, Aug 22, 2015 at 11:18 PM, Ted Yu yuzhih...@gmail.com wrote: In Spark 1.4, there was considerable refactoring around interaction with Hive, such as SPARK-7491. It would not be straight forward to port ORC support to 1.3 FYI On Fri, Aug 21, 2015 at 10:21 PM, dong.yajun dongt...@gmail.com wrote: hi Ted, thanks for your reply, are there any other way to do this with spark 1.3? such as write the orcfile manually in foreachPartition method? On Sat, Aug 22, 2015 at 12:19 PM, Ted Yu yuzhih...@gmail.com wrote: ORC support was added in Spark 1.4 See SPARK-2883 On Fri, Aug 21, 2015 at 7:36 PM, dong.yajun dongt...@gmail.com wrote: Hi list, Is there a way to save the RDD result as Orcfile in spark1.3? due to some reasons we can't upgrade our spark version to 1.4 now. -- *Ric Dong* -- *Ric Dong* -- *Ric Dong*
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
Are you actually losing messages then? On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com wrote: No; first batch only contains messages received after the second job starts (messages come in at a steady rate of about 400/second). On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org wrote: Does the first batch after restart contain all the messages received while the job was down? On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote: Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
Yeah. All messages are lost while the streaming job was down. On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger c...@koeninger.org wrote: Are you actually losing messages then? On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com wrote: No; first batch only contains messages received after the second job starts (messages come in at a steady rate of about 400/second). On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org wrote: Does the first batch after restart contain all the messages received while the job was down? On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote: Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
Sounds like something's not set up right... can you post a minimal code example that reproduces the issue? On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang suchenz...@gmail.com wrote: Yeah. All messages are lost while the streaming job was down. On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger c...@koeninger.org wrote: Are you actually losing messages then? On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com wrote: No; first batch only contains messages received after the second job starts (messages come in at a steady rate of about 400/second). On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org wrote: Does the first batch after restart contain all the messages received while the job was down? On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote: Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
No; first batch only contains messages received after the second job starts (messages come in at a steady rate of about 400/second). On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org wrote: Does the first batch after restart contain all the messages received while the job was down? On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote: Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: CHAID Decision Trees
Nothing is in JIRA https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22CHAID%22 so AFAIK no, only random forests and GBTs using entropy or GINI for information gain is supported. On Tue, Aug 25, 2015 at 9:39 AM, jatinpreet jatinpr...@gmail.com wrote: Hi, I wish to know if MLlib supports CHAID regression and classifcation trees. If yes, how can I build them in spark? Thanks, Jatin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CHAID-Decision-Trees-tp24449.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: Join with multiple conditions (In reference to SPARK-7197)
Hello All, PySpark currently has two ways of performing a join: specifying a join condition or column names. I would like to perform a join using a list of columns that appear in both the left and right DataFrames. I have created an example in this question on Stack Overflow http://stackoverflow.com/questions/32193488/joining-multiple-columns-in-pyspark . Basically, I would like to do the following as specified in the documentation in /spark/python/pyspark/sql/dataframe.py row 560 and specify a list of column names: df.join(df4, ['name', 'age']).select(df.name, df.age).collect() However, this produces an error. In JIRA issue SPARK-7197 https://issues.apache.org/jira/browse/SPARK-7197, it is mentioned that the syntax is actually different from the one specified in the documentation for joining using a condition. Documentation: cond = [df.name == df3.name, df.age == df3.age] df.join(df3, cond, 'outer').select(df.name, df3.age).collect() JIRA Issue: a.join(b, (a.year==b.year) (a.month==b.month), 'inner') In other words. the join function cannot take a list. I was wondering if you could also clarify what is the correct syntax for providing a list of columns. Thanks, Michal
Re: Local Spark talking to remote HDFS?
That's what I'd suggest too. Furthermore, if you use vagrant to spin up VMs, there's a module that can do that automatically for you. R. 2015-08-25 10:11 GMT-07:00 Steve Loughran ste...@hortonworks.com: I wouldn't try to play with forwarding tunnelling; always hard to work out what ports get used everywhere, and the services like hostname==URL in paths. Can't you just set up an entry in the windows /etc/hosts file? It's what I do (on Unix) to talk to VMs On 25 Aug 2015, at 04:49, Dino Fancellu d...@felstar.com wrote: Tried adding 50010, 50020 and 50090. Still no difference. I can't imagine I'm the only person on the planet wanting to do this. Anyway, thanks for trying to help. Dino. On 25 August 2015 at 08:22, Roberto Congiu roberto.con...@gmail.com wrote: Port 8020 is not the only port you need tunnelled for HDFS to work. If you only list the contents of a directory, port 8020 is enough... for instance, using something val p = new org.apache.hadoop.fs.Path(hdfs://localhost:8020/) val fs = p.getFileSystem(sc.hadoopConfiguration) fs.listStatus(p) you should see the file list. But then, when accessing a file, you need to actually get its blocks, it has to connect to the data node. The error 'could not obtain block' means it can't get that block from the DataNode. Refer to http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-1.2.1/bk_reference/content/reference_chap2_1.html to see the complete list of ports that also need to be tunnelled. 2015-08-24 13:10 GMT-07:00 Dino Fancellu d...@felstar.com: Changing the ip to the guest IP address just never connects. The VM has port tunnelling, and it passes through all the main ports, 8020 included to the host VM. You can tell that it was talking to the guest VM before, simply because it said when file not found Error is: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-452094660-10.0.2.15-1437494483194:blk_1073742905_2098 file=/tmp/people.txt but I have no idea what it means by that. It certainly can find the file and knows it exists. On 24 August 2015 at 20:43, Roberto Congiu roberto.con...@gmail.com wrote: When you launch your HDP guest VM, most likely it gets launched with NAT and an address on a private network (192.168.x.x) so on your windows host you should use that address (you can find out using ifconfig on the guest OS). I usually add an entry to my /etc/hosts for VMs that I use oftenif you use vagrant, there's also a vagrant module that can do that automatically. Also, I am not sure how the default HDP VM is set up, that is, if it only binds HDFS to 127.0.0.1 or to all addresses. You can check that with netstat -a. R. 2015-08-24 11:46 GMT-07:00 Dino Fancellu d...@felstar.com: I have a file in HDFS inside my HortonWorks HDP 2.3_1 VirtualBox VM. If I go into the guest spark-shell and refer to the file thus, it works fine val words=sc.textFile(hdfs:///tmp/people.txt) words.count However if I try to access it from a local Spark app on my Windows host, it doesn't work val conf = new SparkConf().setMaster(local).setAppName(My App) val sc = new SparkContext(conf) val words=sc.textFile(hdfs://localhost:8020/tmp/people.txt) words.count Emits The port 8020 is open, and if I choose the wrong file name, it will tell me My pom has dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.11/artifactId version1.4.1/version scopeprovided/scope /dependency Am I doing something wrong? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Local-Spark-talking-to-remote-HDFS-tp24425.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark works with the data in another cluster(Elasticsearch)
While it's true locality might speed things up, I'd say it's a very bad idea to mix your Spark and ES clusters - if your ES cluster is serving production queries (and in particular using aggregations), you'll run into performance issues on your production ES cluster. ES-hadoop uses ES scan scroll to pull data pretty efficiently, so pulling it across the network is not too bad. If you do need to avoid that, pull the data and write what you need to HDFS as say parquet files (eg pull data daily and write it, then you have all data available on your Spark cluster). And of course ensure thatbwhen you do pull data from ES to Spark, you cache it to avoid hitting the network again — Sent from Mailbox On Tue, Aug 25, 2015 at 12:01 PM, Akhil Das ak...@sigmoidanalytics.com wrote: If the data is local to the machine then obviously it will be faster compared to pulling it through the network and storing it locally (either memory or disk etc). Have a look at the data locality http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/data_locality.html . Thanks Best Regards On Tue, Aug 18, 2015 at 8:09 PM, gen tang gen.tan...@gmail.com wrote: Hi, Currently, I have my data in the cluster of Elasticsearch and I try to use spark to analyse those data. The cluster of Elasticsearch and the cluster of spark are two different clusters. And I use hadoop input format(es-hadoop) to read data in ES. I am wondering how this environment affect the speed of analysis. If I understand well, spark will read data from ES cluster and do calculate on its own cluster(include writing shuffle result on its own machine), Is this right? If this is correct, I think that the performance will just a little bit slower than the data stored on the same cluster. I will be appreciated if someone can share his/her experience about using spark with elasticsearch. Thanks a lot in advance for your help. Cheers Gen
How to effieciently write sorted neighborhood in pyspark
I would like to implement sorted neighborhood approach in spark, what is the best way to write that in pyspark. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using unserializable classes in tasks
Instead of foreach try to use forEachPartitions, that will initialize the connector per partition rather than per record. Thanks Best Regards On Fri, Aug 14, 2015 at 1:13 PM, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote: No the connector does not need to be serializable cause it is constructed on the worker. Only objects shuffled across partitions needs to be serializable. 2015-08-14 9:40 GMT+02:00 mark manwoodv...@googlemail.com: I guess I'm looking for a more general way to use complex graphs of objects that cannot be serialized in a task executing on a worker, not just DB connectors. Something like shipping jars to the worker maybe? I'm not sure I understand how your foreach example solves the issue - the Connector there would still need to be serializable surely? Thanks On 14 Aug 2015 8:32 am, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote: I am not an expert but first of all check if there is no ready connector (you mentioned Cassandra - check: spark-cassandra-connector https://github.com/datastax/spark-cassandra-connector ). If you really want to do sth on your own all objects constructed in the passed function will be allocated on the worker. Example given: sc.parrallelize((1 to 100)).forEach(x = new Connector().save(x)) but this way you allocate resources frequently 2015-08-14 9:05 GMT+02:00 mark manwoodv...@googlemail.com: I have a Spark job that computes some values and needs to write those values to a data store. The classes that write to the data store are not serializable (eg, Cassandra session objects etc). I don't want to collect all the results at the driver, I want each worker to write the data - what is the suggested approach for using code that can't be serialized in a task?
Re: Exception when S3 path contains colons
You can change the names, whatever program that is pushing the record must follow the naming conventions. Try to replace : with _ or something. Thanks Best Regards On Tue, Aug 18, 2015 at 10:20 AM, Brian Stempin brian.stem...@gmail.com wrote: Hi, I'm running Spark on Amazon EMR (Spark 1.4.1, Hadoop 2.6.0). I'm seeing the exception below when encountering file names that contain colons. Any idea on how to get around this? scala val files = sc.textFile(s3a://redactedbucketname/*) 2015-08-18 04:38:34,567 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(242224) called with curMem=669367, maxMem=285203496 2015-08-18 04:38:34,568 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_3 stored as values in memory (estimated size 236.5 KB, free 271.1 MB) 2015-08-18 04:38:34,663 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(21533) called with curMem=911591, maxMem=285203496 2015-08-18 04:38:34,664 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_3_piece0 stored as bytes in memory (estimated size 21.0 KB, free 271.1 MB) 2015-08-18 04:38:34,665 INFO [sparkDriver-akka.actor.default-dispatcher-19] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added broadcast_3_piece0 in memory on 10.182.184.26:60338 (size: 21.0 KB, free: 271.9 MB) 2015-08-18 04:38:34,667 INFO [main] spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 3 from textFile at console:21 files: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at textFile at console:21 scala files.count 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem (S3AFileSystem.java:listStatus(533)) - List status for path: s3a://redactedbucketname/ 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem (S3AFileSystem.java:getFileStatus(684)) - Getting path status for s3a://redactedbucketname/ () java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: [922-212-4438]-[119]-[1]-[2015-08-13T15:43:12.346193%5D-%5B2015-01-01T00:00:00%5D-redacted.csv at org.apache.hadoop.fs.Path.initialize(Path.java:206) at org.apache.hadoop.fs.Path.init(Path.java:172) at org.apache.hadoop.fs.Path.init(Path.java:94) at org.apache.hadoop.fs.Globber.glob(Globber.java:240) at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:229) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:200) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:279) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) at org.apache.spark.rdd.RDD.count(RDD.scala:1099) at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:24) at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:29) at $iwC$iwC$iwC$iwC$iwC$iwC.init(console:31) at $iwC$iwC$iwC$iwC$iwC.init(console:33) at $iwC$iwC$iwC$iwC.init(console:35) at $iwC$iwC$iwC.init(console:37) at $iwC$iwC.init(console:39) at $iwC.init(console:41) at init(console:43) at .init(console:47) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at
Re: Spark works with the data in another cluster(Elasticsearch)
If the data is local to the machine then obviously it will be faster compared to pulling it through the network and storing it locally (either memory or disk etc). Have a look at the data locality http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/data_locality.html . Thanks Best Regards On Tue, Aug 18, 2015 at 8:09 PM, gen tang gen.tan...@gmail.com wrote: Hi, Currently, I have my data in the cluster of Elasticsearch and I try to use spark to analyse those data. The cluster of Elasticsearch and the cluster of spark are two different clusters. And I use hadoop input format(es-hadoop) to read data in ES. I am wondering how this environment affect the speed of analysis. If I understand well, spark will read data from ES cluster and do calculate on its own cluster(include writing shuffle result on its own machine), Is this right? If this is correct, I think that the performance will just a little bit slower than the data stored on the same cluster. I will be appreciated if someone can share his/her experience about using spark with elasticsearch. Thanks a lot in advance for your help. Cheers Gen
Re: Spark Streaming: Some issues (Could not compute split, block —— not found) and questions
You hit block not found issues when you processing time exceeds the batch duration (this happens with receiver oriented streaming). If you are consuming messages from Kafka then try to use the directStream or you can also set StorageLevel to MEMORY_AND_DISK with receiver oriented consumer. (This might slow things down a bit though). Thanks Best Regards On Wed, Aug 19, 2015 at 8:21 PM, jlg jgri...@adzerk.com wrote: Some background on what we're trying to do: We have four Kinesis receivers with varying amounts of data coming through them. Ultimately we work on a unioned stream that is getting about 11 MB/second of data. We use a batch size of 5 seconds. We create four distinct DStreams from this data that have different aggregation computations (various combinations of map/flatMap/reduceByKeyAndWindow and then finishing by serializing the records to JSON strings and writing them to S3). We want to do 30 minute windows of computations on this data, to get a better compression rate for the aggregates (there are a lot of repeated keys across this time frame, and we want to combine them all -- we do this using reduceByKeyAndWindow). But even when trying to do 5 minute windows, we have issues with Could not compute split, block —— not found. This is being run on a YARN cluster and it seems like the executors are getting killed even though they should have plenty of memory. Also, it seems like no computation actually takes place until the end of the window duration. This seems inefficient if there is a lot of data that you know is going to be needed for the computation. Is there any good way around this? There are some of the configuration settings we are using for Spark: spark.executor.memory=26000M,\ spark.executor.cores=4,\ spark.executor.instances=5,\ spark.driver.cores=4,\ spark.driver.memory=24000M,\ spark.default.parallelism=128,\ spark.streaming.blockInterval=100ms,\ spark.streaming.receiver.maxRate=2,\ spark.akka.timeout=300,\ spark.storage.memoryFraction=0.6,\ spark.rdd.compress=true,\ spark.executor.instances=16,\ spark.serializer=org.apache.spark.serializer.KryoSerializer,\ spark.kryoserializer.buffer.max=2047m,\ Is this the correct way to do this, and how can I further debug to figure out this issue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Some-issues-Could-not-compute-split-block-not-found-and-questions-tp24342.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re:Re: What does Attribute and AttributeReference mean in Spark SQL
Thank you Michael for the detail explanation, it makes clear to me. Thanks! At 2015-08-25 15:37:54, Michael Armbrust mich...@databricks.com wrote: Attribute is the Catalyst name for an input column from a child operator. An AttributeReference has been resolved, meaning we know which input column in particular it is referring too. An AttributeReference also has a known DataType. In contrast, before analysis there might still exist UnresolvedReferences, which are just string identifiers from a parsed query. An Expression can be more complex (like you suggested, a + b), though technically just a is also a very simple Expression. The following console session shows how these types are composed: $ build/sbt sql/console importorg.apache.spark.SparkContextimportorg.apache.spark.sql.SQLContextimportorg.apache.spark.sql.catalyst.analysis._importorg.apache.spark.sql.catalyst.plans.logical._importorg.apache.spark.sql.catalyst.dsl.expressions._importorg.apache.spark.sql.catalyst.dsl.plans._ sc: org.apache.spark.SparkContext= org.apache.spark.SparkContext@5adfe37d sqlContext: org.apache.spark.sql.SQLContext= org.apache.spark.sql.SQLContext@20d05227 importsqlContext.implicits._importsqlContext._Welcome to Scala version 2.10.4 (JavaHotSpot(TM) 64-BitServerVM, Java1.7.0_45). Type in expressions to have them evaluated. Type:help for more information. scalavalunresolvedAttr:UnresolvedAttribute='a unresolvedAttr: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute='a scalavalrelation=LocalRelation('a.int) relation: org.apache.spark.sql.catalyst.plans.logical.LocalRelation=LocalRelation [a#0] scalavalparsedQuery= relation.select(unresolvedAttr) parsedQuery: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan='Project ['a] LocalRelation [a#0] scala parsedQuery.analyze res11: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan=Project [a#0] LocalRelation [a#0] The #0 after a is a unique identifier (within this JVM) that says where the data is coming from, even as plans are rearranged due to optimizations. On Mon, Aug 24, 2015 at 6:13 PM, Todd bit1...@163.com wrote: There are many such kind of case class or concept such as Attribute/AttributeReference/Expression in Spark SQL I would ask what Attribute/AttributeReference/Expression mean, given a sql query like select a,b from c, it a, b are two Attributes? a + b is an expression? Looks I misunderstand it because Attribute is extending Expression in the code,which means Attribute itself is an Expression. Thanks.
Re: Adding/subtracting org.apache.spark.mllib.linalg.Vector in Scala?
Kristina, Thanks for the discussion. I followed up on your problem and learned that Scala doesn't support multiple implicit conversions in a single expression http://stackoverflow.com/questions/8068346/can-scala-apply-multiple-implicit-conversions-in-one-expression for complexity reasons. I'm afraid the solution for now is to do (v1: BV[Double]) + (v1: BV[Double]) On Tue, Aug 25, 2015 at 11:06 AM, Kristina Rogale Plazonic kpl...@gmail.com wrote: YES PLEASE! :))) On Tue, Aug 25, 2015 at 1:57 PM, Burak Yavuz brk...@gmail.com wrote: Hmm. I have a lot of code on the local linear algebra operations using Spark's Matrix and Vector representations done for https://issues.apache.org/jira/browse/SPARK-6442. I can make a Spark package with that code if people are interested. Best, Burak On Tue, Aug 25, 2015 at 10:54 AM, Kristina Rogale Plazonic kpl...@gmail.com wrote: However I do think it's easier than it seems to write the implicits; it doesn't involve new classes or anything. Yes it's pretty much just what you wrote. There is a class Vector in Spark. This declaration can be in an object; you don't implement your own class. (Also you can use toBreeze to get Breeze vectors.) The implicit conversion with the implicit def happens for the first vector in the sum, but not the second vector (see below). At this point I give up, because I spent way too much time. I am so disappointed. So many times I heard Spark makes simple things easy and complicated things possible. Well, here is the simplest thing you can imagine in linear algebra, but heck, it is not easy or intuitive. It was easier to run a DeepLearning algo (from another library) than add two vectors. If anybody has a workaround other than implementing your own add/substract/scalarMultiply, PLEASE let me know. Here is the code and error from (freshly started) spark-shell: scala import breeze.linalg.{DenseVector = BDV, SparseVector = BSV, Vector = BV} import breeze.linalg.{DenseVector=BDV, SparseVector=BSV, Vector=BV} scala import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.Vectors scala val v1 = Vectors.dense(1.0, 2.0, 3.0) v1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0] scala import org.apache.spark.mllib.linalg.{Vector =SparkVector} import org.apache.spark.mllib.linalg.{Vector=SparkVector} scala object MyUtils { | implicit def toBreeze(v:SparkVector) = BV(v.toArray) | } warning: there were 1 feature warning(s); re-run with -feature for details defined module MyUtils scala import MyUtils._ import MyUtils._ scala v1:BV[Double] res2: breeze.linalg.Vector[Double] = DenseVector(1.0, 2.0, 3.0) scala v1 + v1 console:30: error: could not find implicit value for parameter op: breeze.linalg.operators.OpAdd.Impl2[breeze.linalg.Vector[Double],org.apache.spark.mllib.linalg.Vector,That] v1 + v1 ^
Re: Protobuf error when streaming from Kafka
Hi, I am using Spark-1.4 and Kafka-0.8.2.1 As per google suggestions, I rebuilt all the classes with protobuff-2.5 dependencies. My new protobuf is compiled using 2.5. However now, my spark job does not start. Its throwing different error. Does Spark or any other its dependencies uses old protobuff-2.4? Exception in thread main java.lang.VerifyError: class com.apple.ist.retail.xcardmq.serializers.SampleProtobufMessage$ProtoBuff overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet; at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at com.apple.ist.retail.spark.kafka.consumer.SparkMQProcessor.start(SparkProcessor.java:68) at com.apple.ist.retail.spark.kafka.consumer.SparkMQConsumer.main(SparkConsumer.java:43) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) On Mon, Aug 24, 2015 at 6:53 PM, Ted Yu yuzhih...@gmail.com wrote: Can you show the complete stack trace ? Which Spark / Kafka release are you using ? Thanks On Mon, Aug 24, 2015 at 4:58 PM, Cassa L lcas...@gmail.com wrote: Hi, I am storing messages in Kafka using protobuf and reading them into Spark. I upgraded protobuf version from 2.4.1 to 2.5.0. I got java.lang.UnsupportedOperationException for older messages. However, even for new messages I get the same error. Spark does convert it though. I see my messages. How do I get rid of this error? java.lang.UnsupportedOperationException: This is supposed to be overridden by subclasses. at com.google.protobuf.GeneratedMessage.getUnknownFields(GeneratedMessage.java:180) at org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$FsPermissionProto.getSerializedSize(HdfsProtos.java:5407) at com.google.protobuf.CodedOutputStream.computeMessageSizeNoTag(CodedOutputStream.java:749)
RE: Protobuf error when streaming from Kafka
Did your spark build with Hive? I met the same problem before because the hive-exec jar in the maven itself include protobuf class, which will be included in the Spark jar. Yong Date: Tue, 25 Aug 2015 12:39:46 -0700 Subject: Re: Protobuf error when streaming from Kafka From: lcas...@gmail.com To: yuzhih...@gmail.com CC: user@spark.apache.org Hi, I am using Spark-1.4 and Kafka-0.8.2.1 As per google suggestions, I rebuilt all the classes with protobuff-2.5 dependencies. My new protobuf is compiled using 2.5. However now, my spark job does not start. Its throwing different error. Does Spark or any other its dependencies uses old protobuff-2.4? Exception in thread main java.lang.VerifyError: class com.apple.ist.retail.xcardmq.serializers.SampleProtobufMessage$ProtoBuff overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet; at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at com.apple.ist.retail.spark.kafka.consumer.SparkMQProcessor.start(SparkProcessor.java:68) at com.apple.ist.retail.spark.kafka.consumer.SparkMQConsumer.main(SparkConsumer.java:43) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) On Mon, Aug 24, 2015 at 6:53 PM, Ted Yu yuzhih...@gmail.com wrote: Can you show the complete stack trace ? Which Spark / Kafka release are you using ? Thanks On Mon, Aug 24, 2015 at 4:58 PM, Cassa L lcas...@gmail.com wrote: Hi, I am storing messages in Kafka using protobuf and reading them into Spark. I upgraded protobuf version from 2.4.1 to 2.5.0. I got java.lang.UnsupportedOperationException for older messages. However, even for new messages I get the same error. Spark does convert it though. I see my messages. How do I get rid of this error? java.lang.UnsupportedOperationException: This is supposed to be overridden by subclasses. at com.google.protobuf.GeneratedMessage.getUnknownFields(GeneratedMessage.java:180) at org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$FsPermissionProto.getSerializedSize(HdfsProtos.java:5407) at com.google.protobuf.CodedOutputStream.computeMessageSizeNoTag(CodedOutputStream.java:749)
Re: Protobuf error when streaming from Kafka
I downloaded below binary version of spark. spark-1.4.1-bin-cdh4 On Tue, Aug 25, 2015 at 1:03 PM, java8964 java8...@hotmail.com wrote: Did your spark build with Hive? I met the same problem before because the hive-exec jar in the maven itself include protobuf class, which will be included in the Spark jar. Yong -- Date: Tue, 25 Aug 2015 12:39:46 -0700 Subject: Re: Protobuf error when streaming from Kafka From: lcas...@gmail.com To: yuzhih...@gmail.com CC: user@spark.apache.org Hi, I am using Spark-1.4 and Kafka-0.8.2.1 As per google suggestions, I rebuilt all the classes with protobuff-2.5 dependencies. My new protobuf is compiled using 2.5. However now, my spark job does not start. Its throwing different error. Does Spark or any other its dependencies uses old protobuff-2.4? Exception in thread main java.lang.VerifyError: class com.apple.ist.retail.xcardmq.serializers.SampleProtobufMessage$ProtoBuff overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet; at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at com.apple.ist.retail.spark.kafka.consumer.SparkMQProcessor.start(SparkProcessor.java:68) at com.apple.ist.retail.spark.kafka.consumer.SparkMQConsumer.main(SparkConsumer.java:43) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) On Mon, Aug 24, 2015 at 6:53 PM, Ted Yu yuzhih...@gmail.com wrote: Can you show the complete stack trace ? Which Spark / Kafka release are you using ? Thanks On Mon, Aug 24, 2015 at 4:58 PM, Cassa L lcas...@gmail.com wrote: Hi, I am storing messages in Kafka using protobuf and reading them into Spark. I upgraded protobuf version from 2.4.1 to 2.5.0. I got java.lang.UnsupportedOperationException for older messages. However, even for new messages I get the same error. Spark does convert it though. I see my messages. How do I get rid of this error? java.lang.UnsupportedOperationException: This is supposed to be overridden by subclasses. at com.google.protobuf.GeneratedMessage.getUnknownFields(GeneratedMessage.java:180) at org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$FsPermissionProto.getSerializedSize(HdfsProtos.java:5407) at com.google.protobuf.CodedOutputStream.computeMessageSizeNoTag(CodedOutputStream.java:749)
Re: How to unit test HiveContext without OutOfMemoryError (using sbt)
The PermGen space error is controlled with MaxPermSize parameter. I run with this in my pom, I think copied pretty literally from Spark's own tests... I don't know what the sbt equivalent is but you should be able to pass it...possibly via SBT_OPTS? plugin groupIdorg.scalatest/groupId artifactIdscalatest-maven-plugin/artifactId version1.0/version configuration reportsDirectory${project.build.directory}/surefire-reports/reportsDirectory parallelfalse/parallel junitxml./junitxml filereportsSparkTestSuite.txt/filereports argLine-Xmx3g -XX:MaxPermSize=256m -XX:ReservedCodeCacheSize=512m/argLine stderr/ systemProperties java.awt.headlesstrue/java.awt.headless spark.testing1/spark.testing spark.ui.enabledfalse/spark.ui.enabled spark.driver.allowMultipleContextstrue/spark.driver.allowMultipleContexts /systemProperties /configuration executions execution idtest/id goals goaltest/goal /goals /execution /executions /plugin /plugins On Tue, Aug 25, 2015 at 2:10 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hello, I am using sbt and created a unit test where I create a `HiveContext` and execute some query and then return. Each time I run the unit test the JVM will increase it's memory usage until I get the error: Internal error when running tests: java.lang.OutOfMemoryError: PermGen space Exception in thread Thread-2 java.io.EOFException As a work-around, I can fork a new JVM each time I run the unit test, however, it seems like a bad solution as takes a while to run the unit test. By the way, I tried to importing the TestHiveContext: - import org.apache.spark.sql.hive.test.TestHiveContext However, it suffers from the same memory issue. Has anyone else suffered from the same problem? Note that I am running these unit tests on my mac. Cheers, Mike.
Re: How to access Spark UI through AWS
Thanks. I just tried and still am having trouble. It seems to still be using the private address even if I try going through the resource manager. On Tue, Aug 25, 2015 at 12:34 PM, Kelly, Jonathan jonat...@amazon.com wrote: I'm not sure why the UI appears broken like that either and haven't investigated it myself yet, but if you instead go to the YARN ResourceManager UI (port 8088 if you are using emr-4.x; port 9026 for 3.x, I believe), then you should be able to click on the ApplicationMaster link (or the History link for completed applications) to get to the Spark UI from there. The ApplicationMaster link will use the YARN Proxy Service (port 20888 on emr-4.x; not sure about 3.x) to proxy through the Spark application's UI, regardless of what port it's running on. For completed applications, the History link will send you directly to the Spark History Server UI on port 18080. Hope that helps! ~ Jonathan On 8/24/15, 10:51 PM, Justin Pihony justin.pih...@gmail.com wrote: I am using the steps from this article https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923 to get spark up and running on EMR through yarn. Once up and running I ssh in and cd to the spark bin and run spark-shell --master yarn. Once this spins up I can see that the UI is started at the internal ip of 4040. If I hit the public dns at 4040 with dynamic port tunneling and foxyproxy then I get a crude UI (css seems broken), however the proxy continuously redirects me to the main page, so I cannot drill into anything. So, I tried static tunneling, but can't seem to get through. So, how can I access the spark UI when running a spark shell in AWS yarn? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-Spark-UI -through-AWS-tp24436.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL problem with IBM BigInsight V3
Hi, On our production environment, we have a unique problems related to Spark SQL, and I wonder if anyone can give me some idea what is the best way to handle this. Our production Hadoop cluster is IBM BigInsight Version 3, which comes with Hadoop 2.2.0 and Hive 0.12. Right now, we build spark 1.3.1 ourselves and point to the above versions during the build. Now, here is the problem related to Spark SQL that it cannot query partitioned Hive tables. It has no problem to query non-partitioned Hive tables in Spark SQL. The error in the Spark SQL for querying partitioned Hive tables like following: javax.jdo.JDODataStoreException: Error executing SQL query select PARTITIONS.PART_ID from PARTITIONS inner join TBLS on PARTITIONS.TBL_ID = TBLS.TBL_ID inner join DBS on TBLS.DB_ID = DBS.DB_ID where TBLS.TBL_NAME = ? and DBS.NAME = ?.at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:451) at org.datanucleus.api.jdo.JDOQuery.executeWithArray(JDOQuery.java:321) ...NestedThrowablesStackTrace:com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-204, SQLSTATE=42704, SQLERRMC=CATALOG.PARTITIONS, DRIVER=4.17.36 The Hive metadata of BigInsight V3 is stored in DB2 (Don't ask me why, as it is from IBM), and the above error from DB2 simple means Table NOT FOUND.If I change the above query like following: select PARTITIONS.PART_ID from HIVE.PARTITIONS as PARTITIONS inner join HIVE.TBLS as TBLS on PARTITIONS.TBL_ID = TBLS.TBL_ID inner join HIVE.DBS as DBS on TBLS.DB_ID = DBS.DB_ID where TBLS.TBL_NAME = ? and DBS.NAME = ? and the query will work without any problem. My guess is that IBM changed some part of Hive, to make it can use DB2 as the underline database for Hive. In DB2, it has DB instance, schema and objects. In fact, table PARTITIONS, TBLS and DBS are all existed in the DB2, but under HIVE schema. Funny thing is that for unpartitioned table, the Spark SQL just works fine with DB2 as Hive metadata store. So my options are: 1) Wait for IBM V4.0, which will include Spark, and they will make it work, but don't know when that will happen.2) Build Spark with the Hive jar provided from IBM BigInsight, assume these hive jars will work with DB2?3) Modify some part of Spark SQL code, to make it works with DB2? My feeling is option 3 is the best, but not sure where to start. Thanks Yong db2 = select schemaname from syscat.schemata SCHEMANAME..HIVE.. db2 = list tables for schema hive Table/View Schema Type Creation time--- --- - --BUCKETING_COLS HIVET 2015-08-05-00.09.08.676983CDS HIVET 2015-08-05-00.08.38.861789COLUMNS HIVET 2015-08-05-00.08.56.542476COLUMNS_V2 HIVET 2015-08-05-00.08.36.270223DATABASE_PARAMS HIVET 2015-08-05-00.08.32.453663DBS HIVET 2015-08-05-00.08.29.642279DB_PRIVSHIVET 2015-08-05-00.08.41.411732DELEGATION_TOKENS HIVET 2015-08-05-00.41.45.202784GLOBAL_PRIVSHIVET 2015-08-05-00.08.52.636188IDXSHIVET 2015-08-05-00.08.43.117673INDEX_PARAMSHIVET 2015-08-05-00.08.44.636557MASTER_KEYS HIVET 2015-08-05-00.41.43.849242NUCLEUS_TABLES HIVET 2015-08-05-00.09.11.451975PARTITIONS HIVET 2015-08-05-00.08.45.919837PARTITION_EVENTSHIVET 2015-08-05-00.08.55.244342PARTITION_KEYS HIVET 2015-08-05-00.09.01.802570PARTITION_KEY_VALS HIVET 2015-08-05-00.08.40.103345PARTITION_PARAMSHIVET 2015-08-05-00.08.53.992383PART_COL_PRIVS HIVET 2015-08-05-00.09.03.225567PART_COL_STATS HIVET 2015-08-05-00.41.40.711274PART_PRIVS HIVET 2015-08-05-00.08.48.542585ROLES HIVET 2015-08-05-00.08.57.810737ROLE_MAPHIVET 2015-08-05-00.08.49.984015SDS HIVET 2015-08-05-00.09.04.575646SD_PARAMS HIVET 2015-08-05-00.09.12.710014SEQUENCE_TABLE HIVET 2015-08-05-00.09.06.135560SERDES HIVET
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
Sure thing! The main looks like: -- val kafkaBrokers = conf.getString(s$varPrefix.metadata.broker.list) val kafkaConf = Map( zookeeper.connect - zookeeper, group.id - options.group, zookeeper.connection.timeout.ms - 1, auto.commit.interval.ms - 1000, rebalance.max.retries - 25, bootstrap.servers - kafkaBrokers ) val ssc = StreamingContext.getOrCreate(checkpointDirectory, () = { createContext(kafkaConf, checkpointDirectory, topic, numThreads, isProd) }, createOnError = true) ssc.start() ssc.awaitTermination() -- And createContext is defined as: -- val batchDuration = Seconds(5) val checkpointDuration = Seconds(20) private val AUTO_OFFSET_COMMIT = auto.commit.enable def createContext(kafkaConf: Map[String, String], checkpointDirectory: String, topic: String, numThreads: Int, isProd: Boolean) : StreamingContext = { val sparkConf = new SparkConf().setAppName(***) val ssc = new StreamingContext(sparkConf, batchDuration) ssc.checkpoint(checkpointDirectory) val topicSet = topic.split(,).toSet val groupId = kafkaConf.getOrElse(group.id, ) val directKStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicSet) directKStream.checkpoint(checkpointDuration) val table = *** directKStream.foreachRDD { rdd = val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.flatMap(rec = someFunc(rec)) .reduceByKey((i1: Long, i2: Long) = if (i1 i2) i1 else i2) .foreachPartition { partitionRec = val dbWrite = DynamoDBWriter() partitionRec.foreach { /* Update Dynamo Here */ } } /** Set up ZK Connection **/ val props = new Properties() kafkaConf.foreach(param = props.put(param._1, param._2)) props.setProperty(AUTO_OFFSET_COMMIT, false) val consumerConfig = new ConsumerConfig(props) assert(!consumerConfig.autoCommitEnable) val zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) offsetRanges.foreach { osr = val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic) val zkPath = s${topicDirs.consumerOffsetDir}/${osr.partition} ZkUtils.updatePersistentPath(zkClient, zkPath, osr.untilOffset.toString) } } ssc } On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger c...@koeninger.org wrote: Sounds like something's not set up right... can you post a minimal code example that reproduces the issue? On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang suchenz...@gmail.com wrote: Yeah. All messages are lost while the streaming job was down. On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger c...@koeninger.org wrote: Are you actually losing messages then? On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com wrote: No; first batch only contains messages received after the second job starts (messages come in at a steady rate of about 400/second). On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org wrote: Does the first batch after restart contain all the messages received while the job was down? On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote: Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks!
Re: Join with multiple conditions (In reference to SPARK-7197)
It's good to support this, could you create a JIRA for it and target for 1.6? On Tue, Aug 25, 2015 at 11:21 AM, Michal Monselise michal.monsel...@gmail.com wrote: Hello All, PySpark currently has two ways of performing a join: specifying a join condition or column names. I would like to perform a join using a list of columns that appear in both the left and right DataFrames. I have created an example in this question on Stack Overflow. Basically, I would like to do the following as specified in the documentation in /spark/python/pyspark/sql/dataframe.py row 560 and specify a list of column names: df.join(df4, ['name', 'age']).select(df.name, df.age).collect() However, this produces an error. In JIRA issue SPARK-7197, it is mentioned that the syntax is actually different from the one specified in the documentation for joining using a condition. Documentation: cond = [df.name == df3.name, df.age == df3.age] df.join(df3, cond, 'outer').select(df.name, df3.age).collect() JIRA Issue: a.join(b, (a.year==b.year) (a.month==b.month), 'inner') In other words. the join function cannot take a list. I was wondering if you could also clarify what is the correct syntax for providing a list of columns. Thanks, Michal - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Too many files/dirs in hdfs
Based on what I've read it appears that when using spark streaming there is no good way of optimizing the files on HDFS. Spark streaming writes many small files which is not scalable in apache hadoop. Only other way seem to be to read files after it has been written and merge them to a bigger file, which seems like a extra overhead from maintenance and IO perspective. On Mon, Aug 24, 2015 at 2:51 PM, Mohit Anchlia mohitanch...@gmail.com wrote: Any help would be appreciated On Wed, Aug 19, 2015 at 9:38 AM, Mohit Anchlia mohitanch...@gmail.com wrote: My question was how to do this in Hadoop? Could somebody point me to some examples? On Tue, Aug 18, 2015 at 10:43 PM, UMESH CHAUDHARY umesh9...@gmail.com wrote: Of course, Java or Scala can do that: 1) Create a FileWriter with append or roll over option 2) For each RDD create a StringBuilder after applying your filters 3) Write this StringBuilder to File when you want to write (The duration can be defined as a condition) On Tue, Aug 18, 2015 at 11:05 PM, Mohit Anchlia mohitanch...@gmail.com wrote: Is there a way to store all the results in one file and keep the file roll over separate than the spark streaming batch interval? On Mon, Aug 17, 2015 at 2:39 AM, UMESH CHAUDHARY umesh9...@gmail.com wrote: In Spark Streaming you can simply check whether your RDD contains any records or not and if records are there you can save them using FIleOutputStream: DStream.foreachRDD(t= { var count = t.count(); if (count0){ // SAVE YOUR STUFF} }; This will not create unnecessary files of 0 bytes. On Mon, Aug 17, 2015 at 2:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Currently, spark streaming would create a new directory for every batch and store the data to it (whether it has anything or not). There is no direct append call as of now, but you can achieve this either with FileUtil.copyMerge http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167 or have a separate program which will do the clean up for you. Thanks Best Regards On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Spark stream seems to be creating 0 bytes files even when there is no data. Also, I have 2 concerns here: 1) Extra unnecessary files is being created from the output 2) Hadoop doesn't work really well with too many files and I see that it is creating a directory with a timestamp every 1 second. Is there a better way of writing a file, may be use some kind of append mechanism where one doesn't have to change the batch interval.
Re: Exclude slf4j-log4j12 from the classpath via spark-submit
So do I need to manually copy these 2 jars on my spark executors? On Tue, Aug 25, 2015 at 10:51 AM, Marcelo Vanzin van...@cloudera.com wrote: On Tue, Aug 25, 2015 at 10:48 AM, Utkarsh Sengar utkarsh2...@gmail.com wrote: Now I am going to try it out on our mesos cluster. I assumed spark.executor.extraClassPath takes csv as jars the way --jars takes it but it should be : separated like a regular classpath jar. Ah, yes, those options are just raw classpath strings. Also, they don't cause jars to be copied to the cluster. You'll need the jar to be available at the same location on all cluster machines. -- Marcelo -- Thanks, -Utkarsh
Checkpointing in Iterative Graph Computation
Hi, I have stumbled upon an issue with iterative Graphx computation (using v 1.4.1). It goes thusly -- Setup 1. Construct a graph. 2. Validate that the graph satisfies certain conditions. Here I do some assert(*conditions*) within graph.triplets.foreach(). [Notice that this materializes the graph.] For n iterations 3. Update graph edges and vertices. 4. Collect deltas over whole of graph (to be used in next iteration). Again, this is done through graph.aggregate() and this materializes the graph. 5. Update the graph and use it in next iteration (step 3). Now the problem is -- after about 300 iterations I run into Stackoverflow error due to the lengthy lineage. So, I decided to checkpoint the graph after every k iterations. But it doesn't work. The problem is -- once a graph is materialized then calling checkpoint() on it has no effect, even after materializing the graph again. In fact the isCheckpointed() method on such an RDD will always return false, even after calling checkpoint() and count() on the RDD. Following code should clarify - val users = sc.parallelize(Array((3L, (rxin, student)), (7L, (jgonzal, postdoc))) //Materialize the RDD users.count() //Now call the checkpoint users.checkpoint() users.count() //This fails assert(users.isCheckpointed) And it works the same with Graph.checkpoint(). Now my problem is that in both setup and iteration steps (Step 2 and 5 above) I have to materialize the graph, and so it leaves me in a situation where I can not checkpoint it in a usual fashion. Currently, I am working around this by creating a new Graph every kth iteration with the same edges and vertices and then checkpointing it and then using this new graph for k+1 to 2k iterations and so on. This works. Now my question are - 1. Why doesn't checkpointing work on an RDD if it is materialized? 2. My use case looks pretty common, how do people generally handle this? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Checkpointing-in-Iterative-Graph-Computation-tp24443.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Select some data from Hive (SparkSQL) directly using NodeJS
Hi, I just wonder if there's any way that I can get some sample data (10-20 rows) out of Spark's Hive using NodeJs? Submitting a spark job to show 20 rows of data in web page is not good for me. I've set up Spark Thrift Server as shown in Spark Doc. The server works because I can use *beeline* to connect and query data. Is there any NodeJs package that can be used to connect and query from this server?? Best Regards, Phakin Cheangkrachange
Re: Performance - Python streaming v/s Scala streaming
Thanks for the quick response. I have tried the direct word count python example and it also seems to be slow. Lot of times it is not fetching the words that are sent by the producer. I am using SPARK version 1.4.1 and KAFKA 2.10-0.8.2.0. On Tue, Aug 25, 2015 at 2:05 AM, Tathagata Das t...@databricks.com wrote: The scala version of the Kafka is something that we have been working on for a while, and is likely to be more optimized than the python one. The python one definitely requires pass the data back and forth between JVM and Python VM and decoding the raw bytes to the Python strings (probably less efficient that Java's Byte to UTF8 decoder), so that may cause some extra overheads compared to scala. Also consider trying the direct API. Read more in the Kafka integration guide - http://spark.apache.org/docs/latest/streaming-kafka-integration.html That overall has a much higher throughput that the earlier receiver based approach. BTW, disclaimer. Do not consider this difference as generalization of the performance difference between Scala and Python for all of Spark, For example, DataFrames provide performance parity between Scala and Python APIs. On Mon, Aug 24, 2015 at 5:22 AM, utk.pat utkarsh.pat...@gmail.com wrote: I am new to SPARK streaming. I was running the kafka_wordcount example with a local KAFKA and SPARK instance. It was very easy to set this up and get going :) I tried running both SCALA and Python versions of the word count example. Python versions seems to be extremely slow. Sometimes it has delays of more than couple of minutes. On the other hand SCALA versions seems to be way better. I am running on a windows machine. I am trying to understand what is the cause slowness in python streaming? Is there anything that I am missing? For real time streaming analysis should I prefer SCALA? -- View this message in context: Performance - Python streaming v/s Scala streaming http://apache-spark-user-list.1001560.n3.nabble.com/Performance-Python-streaming-v-s-Scala-streaming-tp24415.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Spark works with the data in another cluster(Elasticsearch)
Great advice. Thanks a lot Nick. In fact, if we use rdd.persist(DISK) command at the beginning of the program to avoid hitting the network again and again. The speed is not influenced a lot. In my case, it is just 1 min more compared to the situation that we put the data in local HDFS. Cheers Gen On Tue, Aug 25, 2015 at 6:26 PM, Nick Pentreath nick.pentre...@gmail.com wrote: While it's true locality might speed things up, I'd say it's a very bad idea to mix your Spark and ES clusters - if your ES cluster is serving production queries (and in particular using aggregations), you'll run into performance issues on your production ES cluster. ES-hadoop uses ES scan scroll to pull data pretty efficiently, so pulling it across the network is not too bad. If you do need to avoid that, pull the data and write what you need to HDFS as say parquet files (eg pull data daily and write it, then you have all data available on your Spark cluster). And of course ensure thatbwhen you do pull data from ES to Spark, you cache it to avoid hitting the network again — Sent from Mailbox https://www.dropbox.com/mailbox On Tue, Aug 25, 2015 at 12:01 PM, Akhil Das ak...@sigmoidanalytics.com wrote: If the data is local to the machine then obviously it will be faster compared to pulling it through the network and storing it locally (either memory or disk etc). Have a look at the data locality http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/data_locality.html . Thanks Best Regards On Tue, Aug 18, 2015 at 8:09 PM, gen tang gen.tan...@gmail.com wrote: Hi, Currently, I have my data in the cluster of Elasticsearch and I try to use spark to analyse those data. The cluster of Elasticsearch and the cluster of spark are two different clusters. And I use hadoop input format(es-hadoop) to read data in ES. I am wondering how this environment affect the speed of analysis. If I understand well, spark will read data from ES cluster and do calculate on its own cluster(include writing shuffle result on its own machine), Is this right? If this is correct, I think that the performance will just a little bit slower than the data stored on the same cluster. I will be appreciated if someone can share his/her experience about using spark with elasticsearch. Thanks a lot in advance for your help. Cheers Gen
Re: spark not launching in yarn-cluster mode
when I am launching with yarn-client also its giving me below error bin/spark-sql --master yarn-client 15/08/25 13:53:20 ERROR YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED! Exception in thread Yarn application state monitor org.apache.spark.SparkException: Error asking standalone scheduler to shut down executors at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:261) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:266) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:158) at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:416) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1411) at org.apache.spark.SparkContext.stop(SparkContext.scala:1644) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:139) Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257) On 25 August 2015 at 14:26, Yanbo Liang yblia...@gmail.com wrote: spark-shell and spark-sql can not be deployed with yarn-cluster mode, because you need to make spark-shell or spark-sql scripts run on your local machine rather than container of YARN cluster. 2015-08-25 16:19 GMT+08:00 Jeetendra Gangele gangele...@gmail.com: Hi All i am trying to launch the spark shell with --master yarn-cluster its giving below error. why this is not supported? bin/spark-sql --master yarn-cluster Error: Cluster deploy mode is not applicable to Spark SQL shell. Run with --help for usage help or --verbose for debug output Regards Jeetendra
Re: How to increase data scale in Spark SQL Perf
Looks like you were attaching images to your email which didn't go through. Consider using third party site for images - or paste error in text. Cheers On Tue, Aug 25, 2015 at 4:22 AM, Todd bit1...@163.com wrote: Hi, The spark sql perf itself contains benchmark data generation. I am using spark shell to run the spark sql perf to generate the data with 10G memory for both driver and executor. When I increase the scalefactor to be 30,and run the job, Then I got the following error: When I jstack it to see the status of the thread. I see the following: looks it is waiting for the process that the spark job kicks off.
Re: Spark Streaming failing on YARN Cluster
yes , when i see my yarn logs for that particular failed app_id, i got the following error. ERROR yarn.ApplicationMaster: SparkContext did not initialize after waiting for 10 ms. Please check earlier log output for errors. Failing the application For this error, I need to change the 'SparkContext', set the Master on yarn cluster ( SetMaster(yarn-cluster) ). Its working fine in cluster mode. Thanks for everyone. *Thanks*, https://in.linkedin.com/in/ramkumarcs31 On Fri, Aug 21, 2015 at 6:41 AM, Jeff Zhang zjf...@gmail.com wrote: AM fails to launch, could you check the yarn app logs ? You can use command yarn logs -your_app_id to get the yarn app logs. On Thu, Aug 20, 2015 at 1:15 AM, Ramkumar V ramkumar.c...@gmail.com wrote: I'm getting some spark exception. Please look this log trace ( *http://pastebin.com/xL9jaRUa http://pastebin.com/xL9jaRUa* ). *Thanks*, https://in.linkedin.com/in/ramkumarcs31 On Wed, Aug 19, 2015 at 10:20 PM, Hari Shreedharan hshreedha...@cloudera.com wrote: It looks like you are having issues with the files getting distributed to the cluster. What is the exception you are getting now? On Wednesday, August 19, 2015, Ramkumar V ramkumar.c...@gmail.com wrote: Thanks a lot for your suggestion. I had modified HADOOP_CONF_DIR in spark-env.sh so that core-site.xml is under HADOOP_CONF_DIR. i can able to see the logs like that you had shown above. Now i can able to run for 3 minutes and store results between every minutes. After sometimes, there is an exception. How to fix this exception ? and Can you please explain where its going wrong ? *Log Link : http://pastebin.com/xL9jaRUa http://pastebin.com/xL9jaRUa * *Thanks*, https://in.linkedin.com/in/ramkumarcs31 On Wed, Aug 19, 2015 at 1:54 PM, Jeff Zhang zjf...@gmail.com wrote: HADOOP_CONF_DIR is the environment variable point to the hadoop conf directory. Not sure how CDH organize that, make sure core-site.xml is under HADOOP_CONF_DIR. On Wed, Aug 19, 2015 at 4:06 PM, Ramkumar V ramkumar.c...@gmail.com wrote: We are using Cloudera-5.3.1. since it is one of the earlier version of CDH, it doesnt supports the latest version of spark. So i installed spark-1.4.1 separately in my machine. I couldnt able to do spark-submit in cluster mode. How to core-site.xml under classpath ? it will be very helpful if you could explain in detail to solve this issue. *Thanks*, https://in.linkedin.com/in/ramkumarcs31 On Fri, Aug 14, 2015 at 8:25 AM, Jeff Zhang zjf...@gmail.com wrote: 1. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.5.0-cdh5.3.5.jar 2. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.4.1.jar 3. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip 4. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/python/lib/py4j-0.8.2.1-src.zip 5. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/examples/src/main/python/streaming/kyt.py 6. 1. diagnostics: Application application_1437639737006_3808 failed 2 times due to AM Container for appattempt_1437639737006_3808_02 exited with exitCode: -1000 due to: File file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip does not exist 2. .Failing this attempt.. Failing the application. The machine you run spark is the client machine, while the yarn AM is running on another machine. And the yarn AM complains that the files are not found as your logs shown. From the logs, its seems that these files are not copied to the HDFS as local resources. I doubt that you didn't put core-site.xml under your classpath, so that spark can not detect your remote file system and won't copy the files to hdfs as local resources. Usually in yarn-cluster mode, you should be able to see the logs like following. 15/08/14 10:48:49 INFO yarn.Client: Preparing resources for our AM container 15/08/14 10:48:49 INFO yarn.Client: Uploading resource file:/Users/abc/github/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar - hdfs:// 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar 15/08/14 10:48:50 INFO yarn.Client: Uploading resource file:/Users/abc/github/spark/spark.py - hdfs:// 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark.py 15/08/14 10:48:50 INFO yarn.Client: Uploading resource file:/Users/abc/github/spark/python/lib/pyspark.zip - hdfs://
Re: Exception when S3 path contains colons
I am not quite sure about this but should the notation not be s3n://redactedbucketname/* instead of s3a://redactedbucketname/* The best way is to use s3://bucketname/path/* Regards, Gourav On Tue, Aug 25, 2015 at 10:35 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can change the names, whatever program that is pushing the record must follow the naming conventions. Try to replace : with _ or something. Thanks Best Regards On Tue, Aug 18, 2015 at 10:20 AM, Brian Stempin brian.stem...@gmail.com wrote: Hi, I'm running Spark on Amazon EMR (Spark 1.4.1, Hadoop 2.6.0). I'm seeing the exception below when encountering file names that contain colons. Any idea on how to get around this? scala val files = sc.textFile(s3a://redactedbucketname/*) 2015-08-18 04:38:34,567 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(242224) called with curMem=669367, maxMem=285203496 2015-08-18 04:38:34,568 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_3 stored as values in memory (estimated size 236.5 KB, free 271.1 MB) 2015-08-18 04:38:34,663 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(21533) called with curMem=911591, maxMem=285203496 2015-08-18 04:38:34,664 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_3_piece0 stored as bytes in memory (estimated size 21.0 KB, free 271.1 MB) 2015-08-18 04:38:34,665 INFO [sparkDriver-akka.actor.default-dispatcher-19] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added broadcast_3_piece0 in memory on 10.182.184.26:60338 (size: 21.0 KB, free: 271.9 MB) 2015-08-18 04:38:34,667 INFO [main] spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 3 from textFile at console:21 files: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at textFile at console:21 scala files.count 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem (S3AFileSystem.java:listStatus(533)) - List status for path: s3a://redactedbucketname/ 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem (S3AFileSystem.java:getFileStatus(684)) - Getting path status for s3a://redactedbucketname/ () java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: [922-212-4438]-[119]-[1]-[2015-08-13T15:43:12.346193%5D-%5B2015-01-01T00:00:00%5D-redacted.csv at org.apache.hadoop.fs.Path.initialize(Path.java:206) at org.apache.hadoop.fs.Path.init(Path.java:172) at org.apache.hadoop.fs.Path.init(Path.java:94) at org.apache.hadoop.fs.Globber.glob(Globber.java:240) at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:229) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:200) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:279) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) at org.apache.spark.rdd.RDD.count(RDD.scala:1099) at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:24) at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:29) at $iwC$iwC$iwC$iwC$iwC$iwC.init(console:31) at $iwC$iwC$iwC$iwC$iwC.init(console:33) at $iwC$iwC$iwC$iwC.init(console:35) at $iwC$iwC$iwC.init(console:37) at $iwC$iwC.init(console:39) at $iwC.init(console:41) at init(console:43) at .init(console:47) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at
using Convert function of sql in spark sql
Hi All, I want to use Convert() function in sql in one of my spark sql query. Can any one tell me whether it is supported or not?
How to increase data scale in Spark SQL Perf
Hi, The spark sql perf itself contains benchmark data generation. I am using spark shell to run the spark sql perf to generate the data with 10G memory for both driver and executor. When I increase the scalefactor to be 30,and run the job, Then I got the following error: When I jstack it to see the status of the thread. I see the following: looks it is waiting for the process that the spark job kicks off.
Re: Local Spark talking to remote HDFS?
Tried adding 50010, 50020 and 50090. Still no difference. I can't imagine I'm the only person on the planet wanting to do this. Anyway, thanks for trying to help. Dino. On 25 August 2015 at 08:22, Roberto Congiu roberto.con...@gmail.com wrote: Port 8020 is not the only port you need tunnelled for HDFS to work. If you only list the contents of a directory, port 8020 is enough... for instance, using something val p = new org.apache.hadoop.fs.Path(hdfs://localhost:8020/) val fs = p.getFileSystem(sc.hadoopConfiguration) fs.listStatus(p) you should see the file list. But then, when accessing a file, you need to actually get its blocks, it has to connect to the data node. The error 'could not obtain block' means it can't get that block from the DataNode. Refer to http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-1.2.1/bk_reference/content/reference_chap2_1.html to see the complete list of ports that also need to be tunnelled. 2015-08-24 13:10 GMT-07:00 Dino Fancellu d...@felstar.com: Changing the ip to the guest IP address just never connects. The VM has port tunnelling, and it passes through all the main ports, 8020 included to the host VM. You can tell that it was talking to the guest VM before, simply because it said when file not found Error is: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-452094660-10.0.2.15-1437494483194:blk_1073742905_2098 file=/tmp/people.txt but I have no idea what it means by that. It certainly can find the file and knows it exists. On 24 August 2015 at 20:43, Roberto Congiu roberto.con...@gmail.com wrote: When you launch your HDP guest VM, most likely it gets launched with NAT and an address on a private network (192.168.x.x) so on your windows host you should use that address (you can find out using ifconfig on the guest OS). I usually add an entry to my /etc/hosts for VMs that I use oftenif you use vagrant, there's also a vagrant module that can do that automatically. Also, I am not sure how the default HDP VM is set up, that is, if it only binds HDFS to 127.0.0.1 or to all addresses. You can check that with netstat -a. R. 2015-08-24 11:46 GMT-07:00 Dino Fancellu d...@felstar.com: I have a file in HDFS inside my HortonWorks HDP 2.3_1 VirtualBox VM. If I go into the guest spark-shell and refer to the file thus, it works fine val words=sc.textFile(hdfs:///tmp/people.txt) words.count However if I try to access it from a local Spark app on my Windows host, it doesn't work val conf = new SparkConf().setMaster(local).setAppName(My App) val sc = new SparkContext(conf) val words=sc.textFile(hdfs://localhost:8020/tmp/people.txt) words.count Emits The port 8020 is open, and if I choose the wrong file name, it will tell me My pom has dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.11/artifactId version1.4.1/version scopeprovided/scope /dependency Am I doing something wrong? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Local-Spark-talking-to-remote-HDFS-tp24425.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception when S3 path contains colons
Hello, We had the same problem. I've written a blog post with the detailed explanation and workaround: http://labs.totango.com/spark-read-file-with-colon/ Greetings, Romi K. On Tue, Aug 25, 2015 at 2:47 PM Gourav Sengupta gourav.sengu...@gmail.com wrote: I am not quite sure about this but should the notation not be s3n://redactedbucketname/* instead of s3a://redactedbucketname/* The best way is to use s3://bucketname/path/* Regards, Gourav On Tue, Aug 25, 2015 at 10:35 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can change the names, whatever program that is pushing the record must follow the naming conventions. Try to replace : with _ or something. Thanks Best Regards On Tue, Aug 18, 2015 at 10:20 AM, Brian Stempin brian.stem...@gmail.com wrote: Hi, I'm running Spark on Amazon EMR (Spark 1.4.1, Hadoop 2.6.0). I'm seeing the exception below when encountering file names that contain colons. Any idea on how to get around this? scala val files = sc.textFile(s3a://redactedbucketname/*) 2015-08-18 04:38:34,567 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(242224) called with curMem=669367, maxMem=285203496 2015-08-18 04:38:34,568 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_3 stored as values in memory (estimated size 236.5 KB, free 271.1 MB) 2015-08-18 04:38:34,663 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(21533) called with curMem=911591, maxMem=285203496 2015-08-18 04:38:34,664 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_3_piece0 stored as bytes in memory (estimated size 21.0 KB, free 271.1 MB) 2015-08-18 04:38:34,665 INFO [sparkDriver-akka.actor.default-dispatcher-19] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added broadcast_3_piece0 in memory on 10.182.184.26:60338 (size: 21.0 KB, free: 271.9 MB) 2015-08-18 04:38:34,667 INFO [main] spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 3 from textFile at console:21 files: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at textFile at console:21 scala files.count 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem (S3AFileSystem.java:listStatus(533)) - List status for path: s3a://redactedbucketname/ 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem (S3AFileSystem.java:getFileStatus(684)) - Getting path status for s3a://redactedbucketname/ () java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: [922-212-4438]-[119]-[1]-[2015-08-13T15:43:12.346193%5D-%5B2015-01-01T00:00:00%5D-redacted.csv at org.apache.hadoop.fs.Path.initialize(Path.java:206) at org.apache.hadoop.fs.Path.init(Path.java:172) at org.apache.hadoop.fs.Path.init(Path.java:94) at org.apache.hadoop.fs.Globber.glob(Globber.java:240) at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:229) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:200) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:279) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) at org.apache.spark.rdd.RDD.count(RDD.scala:1099) at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:24) at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:29) at $iwC$iwC$iwC$iwC$iwC$iwC.init(console:31) at $iwC$iwC$iwC$iwC$iwC.init(console:33) at $iwC$iwC$iwC$iwC.init(console:35) at $iwC$iwC$iwC.init(console:37) at $iwC$iwC.init(console:39) at $iwC.init(console:41) at init(console:43) at .init(console:47) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at
SparkSQL saveAsParquetFile does not preserve AVRO schema
Hi, I have serious problems with saving DataFrame as parquet file. I read the data from the parquet file like this: val df = sparkSqlCtx.parquetFile(inputFile.toString) and print the schema (you can see both fields are required) root |-- time: long (nullable = false) |-- time_ymdhms: long (nullable = false) ...omitted... Now I try to save DataFrame as parquet file like this: df.saveAsParquetFile(outputFile.toString) The code runs normally, but loading the file, which I have saved in the previous step (outputFile) together with the same inputFile fails with this error: Caused by: parquet.schema.IncompatibleSchemaModificationException: repetition constraint is more restrictive: can not merge type required int64 time into optional int64 time The problem is that saveAsParquetFile does not preserve nullable flags! So once I try to load outputFile parquet file and print the schema I get this: root |-- time: long (nullable = true) |-- time_ymdhms: long (nullable = true) ...omitted... I use Spark 1.3.0 with Parquet 1.6.0 Is it somehow possible to keep also these flags? Or is it a bug? Any help will be appreciated. Thanks in advance! Petr -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-saveAsParquetFile-does-not-preserve-AVRO-schema-tp2.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Protobuf error when streaming from Kafka
Do you think this binary would have issue? Do I need to build spark from source code? On Tue, Aug 25, 2015 at 1:06 PM, Cassa L lcas...@gmail.com wrote: I downloaded below binary version of spark. spark-1.4.1-bin-cdh4 On Tue, Aug 25, 2015 at 1:03 PM, java8964 java8...@hotmail.com wrote: Did your spark build with Hive? I met the same problem before because the hive-exec jar in the maven itself include protobuf class, which will be included in the Spark jar. Yong -- Date: Tue, 25 Aug 2015 12:39:46 -0700 Subject: Re: Protobuf error when streaming from Kafka From: lcas...@gmail.com To: yuzhih...@gmail.com CC: user@spark.apache.org Hi, I am using Spark-1.4 and Kafka-0.8.2.1 As per google suggestions, I rebuilt all the classes with protobuff-2.5 dependencies. My new protobuf is compiled using 2.5. However now, my spark job does not start. Its throwing different error. Does Spark or any other its dependencies uses old protobuff-2.4? Exception in thread main java.lang.VerifyError: class com.apple.ist.retail.xcardmq.serializers.SampleProtobufMessage$ProtoBuff overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet; at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at com.apple.ist.retail.spark.kafka.consumer.SparkMQProcessor.start(SparkProcessor.java:68) at com.apple.ist.retail.spark.kafka.consumer.SparkMQConsumer.main(SparkConsumer.java:43) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) On Mon, Aug 24, 2015 at 6:53 PM, Ted Yu yuzhih...@gmail.com wrote: Can you show the complete stack trace ? Which Spark / Kafka release are you using ? Thanks On Mon, Aug 24, 2015 at 4:58 PM, Cassa L lcas...@gmail.com wrote: Hi, I am storing messages in Kafka using protobuf and reading them into Spark. I upgraded protobuf version from 2.4.1 to 2.5.0. I got java.lang.UnsupportedOperationException for older messages. However, even for new messages I get the same error. Spark does convert it though. I see my messages. How do I get rid of this error? java.lang.UnsupportedOperationException: This is supposed to be overridden by subclasses. at com.google.protobuf.GeneratedMessage.getUnknownFields(GeneratedMessage.java:180) at org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$FsPermissionProto.getSerializedSize(HdfsProtos.java:5407) at com.google.protobuf.CodedOutputStream.computeMessageSizeNoTag(CodedOutputStream.java:749)
Re: build spark 1.4.1 with JDK 1.6
A quick question regarding this: how come the artifacts (spark-core in particular) on Maven Central are built with JDK 1.6 (according to the manifest), if Java 7 is required? On Aug 21, 2015 5:32 PM, Sean Owen so...@cloudera.com wrote: Spark 1.4 requires Java 7. On Fri, Aug 21, 2015, 3:12 PM Chen Song chen.song...@gmail.com wrote: I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support PySpark, I used JDK 1.6. I got the following error, [INFO] --- scala-maven-plugin:3.2.0:testCompile (scala-test-compile-first) @ spark-streaming_2.10 --- java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable : Unsupported major.minor version 51.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637) at java.lang.ClassLoader.defineClass(ClassLoader.java:621) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7. Anyone has done this before? Thanks, -- Chen Song
Persisting sorted parquet tables for future sort merge joins
I want to persist a large _sorted_ table to Parquet on S3 and then read this in and join it using the Sorted Merge Join strategy against another large sorted table. The problem is: even though I sort these tables on the join key beforehand, once I persist them to Parquet, they lose the information about their sortedness. Is there anyway to hint to Spark that they do not need to be resorted the next time I read them in? I've been trying this on 1.5 and I keep getting plans looking like: [== Physical Plan ==] [TungstenProject [pos#28400,workf...#28399]] [ SortMergeJoin [CHROM#28403,pos#28400], [CHROM#28399,pos#28332]] [ TungstenSort [CHROM#28403 ASC,pos#28400 ASC], false, 0] [ TungstenExchange hashpartitioning(CHROM#28403,pos#28400)] [ConvertToUnsafe] [ Scan ParquetRelation[file:/sorted.parquet][pos#2848424]] [ TungstenSort [CHROM#28399 ASC,pos#28332 ASC], false, 0] [ TungstenExchange hashpartitioning(CHROM#28399,pos#28332)] [ConvertToUnsafe] [ Scan ParquetRelation[file:exploded_sorted.parquet][pos#2.399]] Thanks, Jason
Spark thrift server on yarn
Hi, I am trying to start a spark thrift server using the following command on Spark 1.3.1 running on yarn: * ./sbin/start-thriftserver.sh --master yarn://resourcemanager.snc1:8032 --executor-memory 512m --hiveconf hive.server2.thrift.bind.host=test-host.sn1 --hiveconf hive.server2.thrift.port=10001 --queue public* It starts up fine and is able to connect to the hive metastore. I now need to view some temporary tables using this thrift server so I start up SparkSql and register a temp table. But the problem is that I am unable to view the temp table using the beeline client. I am pretty sure I am going wrong somewhere and the spark documentation does not clearly say how to run the thrift server in yarn mode or maybe I missed something. Could someone tell me how this is to be done or point me to some documentation? Thanks in advance, Udit
RE: Spark thrift server on yarn
Did you register temp table via the beeline or in a new Spark SQL CLI? As I know, the temp table cannot cross the HiveContext. Hao From: Udit Mehta [mailto:ume...@groupon.com] Sent: Wednesday, August 26, 2015 8:19 AM To: user Subject: Spark thrift server on yarn Hi, I am trying to start a spark thrift server using the following command on Spark 1.3.1 running on yarn: ./sbin/start-thriftserver.sh --master yarn://resourcemanager.snc1:8032 --executor-memory 512m --hiveconf hive.server2.thrift.bind.host=test-host.sn1 --hiveconf hive.server2.thrift.port=10001 --queue public It starts up fine and is able to connect to the hive metastore. I now need to view some temporary tables using this thrift server so I start up SparkSql and register a temp table. But the problem is that I am unable to view the temp table using the beeline client. I am pretty sure I am going wrong somewhere and the spark documentation does not clearly say how to run the thrift server in yarn mode or maybe I missed something. Could someone tell me how this is to be done or point me to some documentation? Thanks in advance, Udit
Re: Spark thrift server on yarn
I registered it in a new Spark SQL CLI. Yeah I thought so too about how the temp tables were accessible across different applications without using a job-server. I see that running* HiveThriftServer2.startWithContext(hiveContext) *within the spark app starts up a thrift server. On Tue, Aug 25, 2015 at 5:32 PM, Cheng, Hao hao.ch...@intel.com wrote: Did you register temp table via the beeline or in a new Spark SQL CLI? As I know, the temp table cannot cross the HiveContext. Hao *From:* Udit Mehta [mailto:ume...@groupon.com] *Sent:* Wednesday, August 26, 2015 8:19 AM *To:* user *Subject:* Spark thrift server on yarn Hi, I am trying to start a spark thrift server using the following command on Spark 1.3.1 running on yarn: * ./sbin/start-thriftserver.sh --master yarn://resourcemanager.snc1:8032 --executor-memory 512m --hiveconf hive.server2.thrift.bind.host=test-host.sn1 --hiveconf hive.server2.thrift.port=10001 --queue public* It starts up fine and is able to connect to the hive metastore. I now need to view some temporary tables using this thrift server so I start up SparkSql and register a temp table. But the problem is that I am unable to view the temp table using the beeline client. I am pretty sure I am going wrong somewhere and the spark documentation does not clearly say how to run the thrift server in yarn mode or maybe I missed something. Could someone tell me how this is to be done or point me to some documentation? Thanks in advance, Udit
Re: build spark 1.4.1 with JDK 1.6
-cdh-user This suggests that Maven is still using Java 6. I think this is indeed controlled by JAVA_HOME. Use 'mvn -X ...' to see a lot more about what is being used and why. I still suspect JAVA_HOME is not visible to the Maven process. Or maybe you have JRE 7 installed but not JDK 7 and it's somehow still finding the Java 6 javac. On Tue, Aug 25, 2015 at 3:45 AM, Eric Friedman eric.d.fried...@gmail.com wrote: I'm trying to build Spark 1.4 with Java 7 and despite having that as my JAVA_HOME, I get [INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) @ spark-launcher_2.10 --- [INFO] Using zinc server for incremental compilation [info] Compiling 8 Java sources to /Users/eric/spark/spark/launcher/target/scala-2.10/classes... [error] javac: invalid source release: 1.7 [error] Usage: javac options source files [error] use -help for a list of possible options [error] Compile failed at Aug 24, 2015 7:44:40 PM [0.020s] [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... SUCCESS [ 3.109 s] [INFO] Spark Project Launcher . FAILURE [ 4.493 s] On Fri, Aug 21, 2015 at 9:43 AM, Marcelo Vanzin van...@cloudera.com wrote: That was only true until Spark 1.3. Spark 1.4 can be built with JDK7 and pyspark will still work. On Fri, Aug 21, 2015 at 8:29 AM, Chen Song chen.song...@gmail.com wrote: Thanks Sean. So how PySpark is supported. I thought PySpark needs jdk 1.6. Chen On Fri, Aug 21, 2015 at 11:16 AM, Sean Owen so...@cloudera.com wrote: Spark 1.4 requires Java 7. On Fri, Aug 21, 2015, 3:12 PM Chen Song chen.song...@gmail.com wrote: I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support PySpark, I used JDK 1.6. I got the following error, [INFO] --- scala-maven-plugin:3.2.0:testCompile (scala-test-compile-first) @ spark-streaming_2.10 --- java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable : Unsupported major.minor version 51.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637) at java.lang.ClassLoader.defineClass(ClassLoader.java:621) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7. Anyone has done this before? Thanks, -- Chen Song -- Chen Song -- --- You received this message because you are subscribed to the Google Groups CDH Users group. To unsubscribe from this group and stop receiving emails from it, send an email to cdh-user+unsubscr...@cloudera.org. For more options, visit https://groups.google.com/a/cloudera.org/d/optout. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Scala: Overload method by its class type
Hi all, I have SomeClass[TYPE] { def some_method(args: fixed_type_args): TYPE } And on runtime, I create instances of this class with different AnyVal + String types, but the return type of some_method varies. I know I could do this with an implicit object, IF some_method received a type, but in this case, I need to have the TYPE defined on its class instance, so for example: val int_instance = new SomeClass[Int] val str_instance = new SomeClass[String] val result: Boolean = int_instance.some_method(args) 0 --- I expected INT here val result2: Boolean = str_instance.som_method(args) contains asdfg I expected STRING here. without compilation errors. Any ideas? I would like to implement something like this: class SomeClass[TYPE] { def some_method(args: Int): Int = { process_integer_overloaded_method } def some_method(args: Int): String = { process_string_overloaded_method } and so on. Any ideas? maybe store classe's TYPE in a constructor instead as a variable somehow? Thanks Saif