Re: build spark 1.4.1 with JDK 1.6
Hm... off the cuff I wonder if this is because somehow the build process ran Maven with Java 6 but forked the Java/Scala compilers and those used JDK 7. Or some later repackaging process ran on the artifacts and used Java 6. I do see Build-Jdk: 1.6.0_45 in the manifest, but I don't think 1.4.x can compile with Java 6. On Tue, Aug 25, 2015 at 9:59 PM, Rick Moritz rah...@gmail.com wrote: A quick question regarding this: how come the artifacts (spark-core in particular) on Maven Central are built with JDK 1.6 (according to the manifest), if Java 7 is required? On Aug 21, 2015 5:32 PM, Sean Owen so...@cloudera.com wrote: Spark 1.4 requires Java 7. On Fri, Aug 21, 2015, 3:12 PM Chen Song chen.song...@gmail.com wrote: I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support PySpark, I used JDK 1.6. I got the following error, [INFO] --- scala-maven-plugin:3.2.0:testCompile (scala-test-compile-first) @ spark-streaming_2.10 --- java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable : Unsupported major.minor version 51.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637) at java.lang.ClassLoader.defineClass(ClassLoader.java:621) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7. Anyone has done this before? Thanks, -- Chen Song - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark and scala-2.11
The property scala-2.11 triggers the profile scala-2.11 -- and additionally disables the scala-2.10 profile, so that's the way to do it. But yes, you also need to run the script before-hand to set up the build for Scala 2.11 as well. On Mon, Aug 24, 2015 at 8:48 PM, Lanny Ripple la...@spotright.com wrote: Hello, The instructions for building spark against scala-2.11 indicate using -Dspark-2.11. When I look in the pom.xml I find a profile named 'spark-2.11' but nothing that would indicate I should set a property. The sbt build seems to need the -Dscala-2.11 property set. Finally build/mvn does a simple grep of scala.version (which doesn't change after running dev/change-version-to-2.11.sh) so the build seems to be grabbing the 2.10.4 scala library. Anyone know (from having done it and used it in production) if the build instructions for spark-1.4.1 against Scala-2.11 are correct? Thanks. -Lanny - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: build spark 1.4.1 with JDK 1.6
Spark 1.4 requires Java 7. On Fri, Aug 21, 2015, 3:12 PM Chen Song chen.song...@gmail.com wrote: I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support PySpark, I used JDK 1.6. I got the following error, [INFO] --- scala-maven-plugin:3.2.0:testCompile (scala-test-compile-first) @ spark-streaming_2.10 --- java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable : Unsupported major.minor version 51.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637) at java.lang.ClassLoader.defineClass(ClassLoader.java:621) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7. Anyone has done this before? Thanks, -- Chen Song
Re: DAG related query
No. The third line creates a third RDD whose reference simply replaces the reference to the first RDD in your local driver program. The first RDD still exists. On Thu, Aug 20, 2015 at 2:15 PM, Bahubali Jain bahub...@gmail.com wrote: Hi, How would the DAG look like for the below code JavaRDDString rdd1 = context.textFile(SOMEPATH); JavaRDDString rdd2 = rdd1.map(DO something); rdd1 = rdd2.map(Do SOMETHING); Does this lead to any kind of cycle? Thanks, Baahu - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Java 8 lambdas
Yes, it should Just Work. lambdas can be used for any method that takes an instance of an interface with one method, and that describes Function, PairFunction, etc. On Tue, Aug 18, 2015 at 3:23 PM, Kristoffer Sjögren sto...@gmail.com wrote: Hi Is there a way to execute spark jobs with Java 8 lambdas instead of using anonymous inner classes as seen in the examples? I think I remember seeing real lambdas in the examples before and in articles [1]? Cheers, -Kristoffer [1] http://blog.cloudera.com/blog/2014/04/making-apache-spark-easier-to-use-in-java-with-java-8 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Input size increasing every iteration of gradient boosted trees [1.4]
Not that I have any answer at this point, but I was discussing this exact same problem with Johannes today. An input size of ~20K records was growing each iteration by ~15M records. I could not see why on a first look. @jkbradley I know it's not much info but does that ring any bells? I think Johannes even has an instance of this up and running for examination. On Thu, Aug 13, 2015 at 10:04 PM, Matt Forbes mfor...@twitter.com.invalid wrote: I am training a boosted trees model on a couple million input samples (with around 300 features) and am noticing that the input size of each stage is increasing each iteration. For each new tree, the first step seems to be building the decision tree metadata, which does a .count() on the input data, so this is the step I've been using to track the input size changing. Here is what I'm seeing: count at DecisionTreeMetadata.scala:111 1. Input Size / Records: 726.1 MB / 1295620 2. Input Size / Records: 106.9 GB / 64780816 3. Input Size / Records: 160.3 GB / 97171224 4. Input Size / Records: 214.8 GB / 129680959 5. Input Size / Records: 268.5 GB / 162533424 Input Size / Records: 1912.6 GB / 1382017686 This step goes from taking less than 10s up to 5 minutes by the 15th or so iteration. I'm not quite sure what could be causing this. I am passing a memory-only cached RDD[LabeledPoint] to GradientBoostedTrees.train Does anybody have some insight? Is this a bug or could it be an error on my part? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ClosureCleaner does not work for java code
The difference is really that Java and Scala work differently. In Java, your anonymous subclass of Ops defined in (a method of) AbstractTest captures a reference to it. That much is 'correct' in that it's how Java is supposed to work, and AbstractTest is indeed not serializable since you didn't declare it so. However the reference isn't actually used and Spark tries to remove references where possible for you. It can't always do it IIRC (e.g. nulling some fields would mutate objects in unpredictable ways) and I think that's what happens here. In the first place you want to avoid having this hidden reference by making, for instance, a static inner class or something. There's probably a lot of ways to rewrite this. Scala just works differently in the code that's generated. On Mon, Aug 10, 2015 at 4:32 PM, Hao Ren inv...@gmail.com wrote: Consider two code snippets as the following: // Java code: abstract class Ops implements Serializable{ public abstract Integer apply(Integer x); public void doSomething(JavaRDDInteger rdd) { rdd.map(x - x + apply(x)) .collect() .forEach(System.out::println); } } public class AbstractTest { public static void main(String[] args) { new AbstractTest().job(); } public void job() { SparkConf conf = new SparkConf() .setAppName(AbstractTest.class.getName()) .setMaster(local[*]); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDDInteger rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6)); Ops ops = new Ops() { @Override public Integer apply(Integer x) { return x + 1; } }; ops.doSomething(rdd); } } // Scala code: abstract class Ops extends Serializable { def apply(x: Int): Int def doSomething(rdd: RDD[Int]): Unit = { rdd.map(x = apply(x)).collect foreach println } } class AbstractTest { def job(): Unit = { val conf = new SparkConf() .setAppName(this.getClass.getName) .setMaster(local[*]) val sc = new SparkContext(conf) val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7)) val ops = new Ops() { override def apply(x: Int): Int = x + 1 } ops.doSomething(rdd) } } object AbstractTest { def main(args: Array[String]): Unit = { new AbstractTest().job() } } They are actually very similar, just doing the same thing, whereas the scala one works fine, and the java one does not. Task not serializable exception is encountered when the java code is executed, here is the state trace: Exception in thread main org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.map(RDD.scala:293) at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:90) at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:47) at fr.leboncoin.etl.jobs.test.Ops.doSomething(AbstractTest.java:24) at fr.leboncoin.etl.jobs.test.AbstractTest.job(AbstractTest.java:52) at fr.leboncoin.etl.jobs.test.AbstractTest.main(AbstractTest.java:33) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.io.NotSerializableException: fr.leboncoin.etl.jobs.test.AbstractTest Serialization stack: - object not serializable (class: test.AbstractTest, value: test.AbstractTest@61d84e08) - field (class: test.AbstractTest$1, name: this$0, type: class test.AbstractTest) - object (class test.AbstractTest$1, test.AbstractTest$1@476e8796) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class fr.leboncoin.etl.jobs.test.Ops, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial
Re: Unable to load native-hadoop library for your platform
You can ignore it entirely. It just means you haven't installed and configured native libraries for things like accelerated compression, but it has no negative impact otherwise. On Tue, Aug 4, 2015 at 8:11 AM, Deepesh Maheshwari deepesh.maheshwar...@gmail.com wrote: Hi, When i run the spark locally on windows it gives below hadoop library error. I am using below spark version. dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.4.1/version /dependency 2015-08-04 12:22:23,463 WARN (org.apache.hadoop.util.NativeCodeLoader:62) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Tried to find it on internet but not able to find exact root cause. Please let me know what is it, why it is giving warning and how can i resolve it. Thanks, Deepesh - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to load native-hadoop library for your platform
It won't affect you if you're not actually running Hadoop. But it's mainly things like Snappy/LZO compression which are implemented as native libraries under the hood. Spark doesn't necessarily use these anyway; it's from the Hadoop libs. On Tue, Aug 4, 2015 at 8:30 AM, Deepesh Maheshwari deepesh.maheshwar...@gmail.com wrote: Can you elaborate about the things this native library covering. One you mentioned accelerated compression. It would be very helpful if you can give any useful to link to read more about it. On Tue, Aug 4, 2015 at 12:56 PM, Sean Owen so...@cloudera.com wrote: You can ignore it entirely. It just means you haven't installed and configured native libraries for things like accelerated compression, but it has no negative impact otherwise. On Tue, Aug 4, 2015 at 8:11 AM, Deepesh Maheshwari deepesh.maheshwar...@gmail.com wrote: Hi, When i run the spark locally on windows it gives below hadoop library error. I am using below spark version. dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.4.1/version /dependency 2015-08-04 12:22:23,463 WARN (org.apache.hadoop.util.NativeCodeLoader:62) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Tried to find it on internet but not able to find exact root cause. Please let me know what is it, why it is giving warning and how can i resolve it. Thanks, Deepesh - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to load native-hadoop library for your platform
Oh good point, does the Windows integration need native libs for POSIX-y file system access? I know there are some binaries shipped for this purpose but wasn't sure if that's part of what's covered in the native libs message. On Tue, Aug 4, 2015 at 6:01 PM, Steve Loughran ste...@hortonworks.com wrote: Think it may be needed on Windows, certainly if you start trying to work with local files. On 4 Aug 2015, at 00:34, Sean Owen so...@cloudera.com wrote: It won't affect you if you're not actually running Hadoop. But it's mainly things like Snappy/LZO compression which are implemented as native libraries under the hood. There's a lot more in those native libs, primarily to bypass bits missing from the java APIs (FS permissions) and to add new features (encryption, soon erasure coding). The Hadoop file:// FS uses it on windows, at least for now - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Checkpointing doesn't appear to be working for direct streaming from Kafka
If you've set the checkpoint dir, it seems like indeed the intent is to use a default checkpoint interval in DStream: private[streaming] def initialize(time: Time) { ... // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger if (mustCheckpoint checkpointDuration == null) { checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt logInfo(Checkpoint interval automatically set to + checkpointDuration) } Do you see that log message? what's the interval? that could at least explain why it's not doing anything, if it's quite long. It sort of seems wrong though since https://spark.apache.org/docs/latest/streaming-programming-guide.html suggests it was intended to be a multiple of the batch interval. The slide duration wouldn't always be relevant anyway. On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I've instrumented checkpointing per the programming guide and I can tell that Spark Streaming is creating the checkpoint directories but I'm not seeing any content being created in those directories nor am I seeing the effects I'd expect from checkpointing. I'd expect any data that comes into Kafka while the consumers are down, to get picked up when the consumers are restarted; I'm not seeing that. For now my checkpoint directory is set to the local file system with the directory URI being in this form: file:///mnt/dir1/dir2. I see a subdirectory named with a UUID being created under there but no files. I'm using a custom JavaStreamingContextFactory which creates a JavaStreamingContext with the directory set into it via the checkpoint(String) method. I'm currently not invoking the checkpoint(Duration) method on the DStream since I want to first rely on Spark's default checkpointing interval. My streaming batch duration millis is set to 1 second. Anyone have any idea what might be going wrong? Also, at which point does Spark delete files from checkpointing? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PermGen Space Error
Yes, I think this was asked because you didn't say what flags you set before, and it's worth verifying they're the correct ones. Although I'd be kind of surprised if 512m isn't enough, did you try more? You could also try -XX:+CMSClassUnloadingEnabled -XX:+CMSPermGenSweepingEnabled Also verify your executor/driver actually started with this option to rule out a config problem. On Wed, Jul 29, 2015 at 10:45 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes. As mentioned in my mail at the end, I tried with both 256 and 512 options. But the issue persists. I'm giving following parameters to spark configuration - spark.core.connection.ack.wait.timeout=600 spark.akka.timeout=1000 spark.akka.framesize=50 spark.executor.memory=2g spark.task.cpus=2 spark.scheduler.mode=fair spark.driver.extraJavaOptions=-XX:MaxPermSize=256m spark.executor.extraJavaOptions=-XX:MaxPermSize=256m The jars being included are of about 21MB, the data being processed by the job is around 1000 rows with 25 columns. I'm running on a single node mesos cluster on my laptop having 4 CPUs and 12GB RAM. On Wed, Jul 29, 2015 at 2:49 PM, fightf...@163.com fightf...@163.com wrote: Hi, Sarath Did you try to use and increase spark.excecutor.extraJaveOptions -XX:PermSize= -XX:MaxPermSize= fightf...@163.com From: Sarath Chandra Date: 2015-07-29 17:39 To: user@spark.apache.org Subject: PermGen Space Error Dear All, I'm using - = Spark 1.2.0 = Hive 0.13.1 = Mesos 0.18.1 = Spring = JDK 1.7 I've written a scala program which = instantiates a spark and hive context = parses an XML file which provides the where clauses for queries = generates full fledged hive queries to be run on hive tables = registers obtained SchemaRDD as temp tables to get reduced data sets to be queried further = prints the count of finally obtained data set I'm running this scala programatically through java command (command invokes a controller program to create some useful value objects using input parameters and properties files and then calls the above scala program). I'm getting PermGen Space error when it hits the last line to print the count. I'm printing to console the generated hive queries from the scala program. When I run the same from a spark shell it works fine. As mentioned in some posts and blogs I tried using the option spark.driver.extraJavaOptions to increase the size, tried with 256 and 512 but still no luck. Please help me in resolving the space issue Thanks Regards, Sarath. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0
Does adding back the cygwin detection and this clause make it work? if $cygwin; then CLASSPATH=`cygpath -wp $CLASSPATH` fi If so I imagine that's fine to bring back, if that's still needed. On Tue, Jul 28, 2015 at 9:49 AM, Proust GZ Feng pf...@cn.ibm.com wrote: Thanks Owen, the problem under Cygwin is while run spark-submit under 1.4.0, it simply report Error: Could not find or load main class org.apache.spark.launcher.Main This is because under Cygwin spark-class make the LAUNCH_CLASSPATH as /c/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar But under Cygwin java in Windows cannot recognize the classpath, so below command simply error out java -cp /c/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar org.apache.spark.launcher.Main Error: Could not find or load main class org.apache.spark.launcher.Main Thanks Proust From:Sean Owen so...@cloudera.com To:Proust GZ Feng/China/IBM@IBMCN Cc:user user@spark.apache.org Date:07/28/2015 02:20 PM Subject:Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0 It wasn't removed, but rewritten. Cygwin is just a distribution of POSIX-related utilities so you should be able to use the normal .sh scripts. In any event, you didn't say what the problem is? On Tue, Jul 28, 2015 at 5:19 AM, Proust GZ Feng pf...@cn.ibm.com wrote: Hi, Spark Users Looks like Spark 1.4.0 cannot work with Cygwin due to the removing of Cygwin support in bin/spark-class The changeset is https://github.com/apache/spark/commit/517975d89d40a77c7186f488547eed11f79c1e97#diff-fdf4d3e600042c63ffa17b692c4372a3 The changeset said Add a library for launching Spark jobs programmatically, but how to use it in Cygwin? I'm wondering any solutions available to make it work in Windows? Thanks Proust - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0
It wasn't removed, but rewritten. Cygwin is just a distribution of POSIX-related utilities so you should be able to use the normal .sh scripts. In any event, you didn't say what the problem is? On Tue, Jul 28, 2015 at 5:19 AM, Proust GZ Feng pf...@cn.ibm.com wrote: Hi, Spark Users Looks like Spark 1.4.0 cannot work with Cygwin due to the removing of Cygwin support in bin/spark-class The changeset is https://github.com/apache/spark/commit/517975d89d40a77c7186f488547eed11f79c1e97#diff-fdf4d3e600042c63ffa17b692c4372a3 The changeset said Add a library for launching Spark jobs programmatically, but how to use it in Cygwin? I'm wondering any solutions available to make it work in Windows? Thanks Proust - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0
That's for the Windows interpreter rather than bash-running Cygwin. I don't know it's worth doing a lot of legwork for Cygwin, but, if it's really just a few lines of classpath translation in one script, seems reasonable. On Tue, Jul 28, 2015 at 9:13 PM, Steve Loughran ste...@hortonworks.com wrote: there's a spark-submit.cmd file for windows. Does that work? On 27 Jul 2015, at 21:19, Proust GZ Feng pf...@cn.ibm.com wrote: Hi, Spark Users Looks like Spark 1.4.0 cannot work with Cygwin due to the removing of Cygwin support in bin/spark-class The changeset is https://github.com/apache/spark/commit/517975d89d40a77c7186f488547eed11f79c1e97#diff-fdf4d3e600042c63ffa17b692c4372a3 The changeset said Add a library for launching Spark jobs programmatically, but how to use it in Cygwin? I'm wondering any solutions available to make it work in Windows? Thanks Proust - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What else is need to setup native support of BLAS/LAPACK with Spark?
Great, and that file exists on HDFS and is world readable? just double-checking. What classpath is this -- your driver or executor? this is the driver, no? I assume so just because it looks like it references the assembly you built locally and from which you're launching the driver. I think we're concerned with the executors and what they have on the classpath. I suspect there is still a problem somewhere in there. On Mon, Jul 20, 2015 at 4:59 PM, Arun Ahuja aahuj...@gmail.com wrote: Cool, I tried that as well, and doesn't seem different: spark.yarn.jar seems set [image: Inline image 1] This actually doesn't change the classpath, not sure if it should: [image: Inline image 3] But same netlib warning. Thanks for the help! - Arun On Fri, Jul 17, 2015 at 3:18 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Can you try setting the spark.yarn.jar property to make sure it points to the jar you're thinking of? -Sandy On Fri, Jul 17, 2015 at 11:32 AM, Arun Ahuja aahuj...@gmail.com wrote: Yes, it's a YARN cluster and using spark-submit to run. I have SPARK_HOME set to the directory above and using the spark-submit script from there. bin/spark-submit --master yarn-client --executor-memory 10g --driver-memory 8g --num-executors 400 --executor-cores 1 --class org.hammerlab.guacamole.Guacamole --conf spark.default.parallelism=4000 --conf spark.storage.memoryFraction=0.15 libgfortran.so.3 is also there ls /usr/lib64/libgfortran.so.3 /usr/lib64/libgfortran.so.3 These are jniloader files in the jar jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep jniloader META-INF/maven/com.github.fommil/jniloader/ META-INF/maven/com.github.fommil/jniloader/pom.xml META-INF/maven/com.github.fommil/jniloader/pom.properties Thanks, Arun On Fri, Jul 17, 2015 at 1:30 PM, Sean Owen so...@cloudera.com wrote: Make sure /usr/lib64 contains libgfortran.so.3; that's really the issue. I'm pretty sure the answer is 'yes', but, make sure the assembly has jniloader too. I don't see why it wouldn't, but, that's needed. What is your env like -- local, standalone, YARN? how are you running? Just want to make sure you are using this assembly across your cluster. On Fri, Jul 17, 2015 at 6:26 PM, Arun Ahuja aahuj...@gmail.com wrote: Hi Sean, Thanks for the reply! I did double-check that the jar is one I think I am running: [image: Inline image 2] jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native com/github/fommil/netlib/NativeRefARPACK.class com/github/fommil/netlib/NativeRefBLAS.class com/github/fommil/netlib/NativeRefLAPACK.class com/github/fommil/netlib/NativeSystemARPACK.class com/github/fommil/netlib/NativeSystemBLAS.class com/github/fommil/netlib/NativeSystemLAPACK.class Also, I checked the gfortran version on the cluster nodes and it is available and is 5.1 $ gfortran --version GNU Fortran (GCC) 5.1.0 Copyright (C) 2015 Free Software Foundation, Inc. and still see: 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Does anything need to be adjusted in my application POM? Thanks, Arun On Thu, Jul 16, 2015 at 5:26 PM, Sean Owen so...@cloudera.com wrote: Yes, that's most of the work, just getting the native libs into the assembly. netlib can find them from there even if you don't have BLAS libs on your OS, since it includes a reference implementation as a fallback. One common reason it won't load is not having libgfortran installed on your OSes though. It has to be 4.6+ too. That can't be shipped even in netlib and has to exist on your hosts. The other thing I'd double-check is whether you are really using this assembly you built for your job -- like, it's the actually the assembly the executors are using. On Tue, Jul 7, 2015 at 8:47 PM, Arun Ahuja aahuj...@gmail.com wrote: Is there more documentation on what is needed to setup BLAS/LAPACK native suport with Spark. I’ve built spark with the -Pnetlib-lgpl flag and see that the netlib classes are in the assembly jar. jar tvf spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native 6625 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefARPACK.class 21123 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefBLAS.class 178334 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefLAPACK.class 6640 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib
Re: ALS run method versus ALS train versus ALS fit and transform
Yes, just have a look at the method in the source code. It calls new ALS()run(). It's a convenience wrapper only. On Fri, Jul 17, 2015 at 4:59 PM, Carol McDonald cmcdon...@maprtech.com wrote: the new ALS()...run() form is underneath both of the first two. I am not sure what you mean by underneath, so basically the mllib ALS()...run()does the same thing as the mllib ALS train() ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What else is need to setup native support of BLAS/LAPACK with Spark?
Make sure /usr/lib64 contains libgfortran.so.3; that's really the issue. I'm pretty sure the answer is 'yes', but, make sure the assembly has jniloader too. I don't see why it wouldn't, but, that's needed. What is your env like -- local, standalone, YARN? how are you running? Just want to make sure you are using this assembly across your cluster. On Fri, Jul 17, 2015 at 6:26 PM, Arun Ahuja aahuj...@gmail.com wrote: Hi Sean, Thanks for the reply! I did double-check that the jar is one I think I am running: [image: Inline image 2] jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native com/github/fommil/netlib/NativeRefARPACK.class com/github/fommil/netlib/NativeRefBLAS.class com/github/fommil/netlib/NativeRefLAPACK.class com/github/fommil/netlib/NativeSystemARPACK.class com/github/fommil/netlib/NativeSystemBLAS.class com/github/fommil/netlib/NativeSystemLAPACK.class Also, I checked the gfortran version on the cluster nodes and it is available and is 5.1 $ gfortran --version GNU Fortran (GCC) 5.1.0 Copyright (C) 2015 Free Software Foundation, Inc. and still see: 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Does anything need to be adjusted in my application POM? Thanks, Arun On Thu, Jul 16, 2015 at 5:26 PM, Sean Owen so...@cloudera.com wrote: Yes, that's most of the work, just getting the native libs into the assembly. netlib can find them from there even if you don't have BLAS libs on your OS, since it includes a reference implementation as a fallback. One common reason it won't load is not having libgfortran installed on your OSes though. It has to be 4.6+ too. That can't be shipped even in netlib and has to exist on your hosts. The other thing I'd double-check is whether you are really using this assembly you built for your job -- like, it's the actually the assembly the executors are using. On Tue, Jul 7, 2015 at 8:47 PM, Arun Ahuja aahuj...@gmail.com wrote: Is there more documentation on what is needed to setup BLAS/LAPACK native suport with Spark. I’ve built spark with the -Pnetlib-lgpl flag and see that the netlib classes are in the assembly jar. jar tvf spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native 6625 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefARPACK.class 21123 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefBLAS.class 178334 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefLAPACK.class 6640 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemARPACK.class 21138 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemBLAS.class 178349 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemLAPACK.class Also I see the following in /usr/lib64 ls /usr/lib64/libblas. libblas.a libblas.solibblas.so.3 libblas.so.3.2 libblas.so.3.2.1 ls /usr/lib64/liblapack liblapack.a liblapack_pic.a liblapack.so liblapack.so.3 liblapack.so.3.2liblapack.so.3.2.1 But I stil see the following in the Spark logs: 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/07 15:36:26 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/07 15:36:26 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Anything in this process I missed? Thanks, Arun
Re: Getting not implemented by the TFS FileSystem implementation
See also https://issues.apache.org/jira/browse/SPARK-8385 (apologies if someone already mentioned that -- just saw this thread) On Thu, Jul 16, 2015 at 7:19 PM, Jerrick Hoang jerrickho...@gmail.com wrote: So, this has to do with the fact that 1.4 has a new way to interact with HiveMetastore, still investigating. Would really appreciate if anybody has any insights :) On Tue, Jul 14, 2015 at 4:28 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Hi all, I'm upgrading from spark1.3 to spark1.4 and when trying to run spark-sql CLI. It gave an ```ava.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem implementation``` exception. I did not get this error with 1.3 and I don't use any TFS FileSystem. Full stack trace is ```Exception in thread main java.lang.RuntimeException: java.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem implementation at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.client.ClientWrapper.init(ClientWrapper.scala:105) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:408) at org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:170) at org.apache.spark.sql.hive.client.IsolatedClientLoader.init(IsolatedClientLoader.scala:166) at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:212) at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:175) at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:358) at org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:205) at org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:204) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.sql.SQLContext.init(SQLContext.scala:204) at org.apache.spark.sql.hive.HiveContext.init(HiveContext.scala:71) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:53) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.init(SparkSQLCLIDriver.scala:248) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:136) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem implementation at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:214) at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2365) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2375) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:167) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:342) ... 31 more``` Thanks all - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What else is need to setup native support of BLAS/LAPACK with Spark?
Yes, that's most of the work, just getting the native libs into the assembly. netlib can find them from there even if you don't have BLAS libs on your OS, since it includes a reference implementation as a fallback. One common reason it won't load is not having libgfortran installed on your OSes though. It has to be 4.6+ too. That can't be shipped even in netlib and has to exist on your hosts. The other thing I'd double-check is whether you are really using this assembly you built for your job -- like, it's the actually the assembly the executors are using. On Tue, Jul 7, 2015 at 8:47 PM, Arun Ahuja aahuj...@gmail.com wrote: Is there more documentation on what is needed to setup BLAS/LAPACK native suport with Spark. I’ve built spark with the -Pnetlib-lgpl flag and see that the netlib classes are in the assembly jar. jar tvf spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native 6625 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefARPACK.class 21123 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefBLAS.class 178334 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefLAPACK.class 6640 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemARPACK.class 21138 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemBLAS.class 178349 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemLAPACK.class Also I see the following in /usr/lib64 ls /usr/lib64/libblas. libblas.a libblas.solibblas.so.3 libblas.so.3.2 libblas.so.3.2.1 ls /usr/lib64/liblapack liblapack.a liblapack_pic.a liblapack.soliblapack.so.3 liblapack.so.3.2liblapack.so.3.2.1 But I stil see the following in the Spark logs: 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/07 15:36:26 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/07 15:36:26 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Anything in this process I missed? Thanks, Arun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ALS run method versus ALS train versus ALS fit and transform
The first two examples are from the .mllib API. Really, the new ALS()...run() form is underneath both of the first two. In the second case, you're calling a convenience method that calls something similar to the first example. The second example is from the new .ml pipelines API. Similar ideas, but a different API. On Wed, Jul 15, 2015 at 9:55 PM, Carol McDonald cmcdon...@maprtech.com wrote: In the Spark mllib examples MovieLensALS.scala ALS run is used, however in the movie recommendation with mllib tutorial ALS train is used , What is the difference, when should you use one versus the other val model = new ALS() .setRank(params.rank) .setIterations(params.numIterations) .setLambda(params.lambda) .setImplicitPrefs(params.implicitPrefs) .setUserBlocks(params.numUserBlocks) .setProductBlocks(params.numProductBlocks) .run(training) val model = ALS.train(training, rank, numIter, lambda) Also in org.apache.spark.examples.ml , fit and transform is used. Which one do you recommend using ? val als = new ALS() .setUserCol(userId) .setItemCol(movieId) .setRank(params.rank) .setMaxIter(params.maxIter) .setRegParam(params.regParam) .setNumBlocks(params.numBlocks) val model = als.fit(training.toDF()) val predictions = model.transform(test.toDF()).cache() - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MovieALS Implicit Error
Is the data set synthetic, or has very few items? or is indeed very sparse? those could be reasons. However usually this kind of thing happens with very small data sets. I could be wrong about what's going on, but it's a decent guess at the immediate cause given the error messages. On Mon, Jul 13, 2015 at 12:12 PM, Benedict Liang bli...@thecarousell.com wrote: Hi Sean, Thank you for your quick response. By very little data, do you mean that the matrix is too sparse? Or are there too little data points? There are 3856988 ratings that are in my dataset currently. Regards, Benedict - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MovieALS Implicit Error
I interpret this to mean that the input to the Cholesky decomposition wasn't positive definite. I think this can happen if the input matrix is singular or very near singular -- maybe, very little data? Ben that might at least address why this is happening; different input may work fine. Xiangrui I think we might have discussed this a while ago but I am not sure positive definite is a good assumption here, so I don't know that Cholesky can be used reliably. I have always used the QR decomposition for this reason. Then again there is always this 10% chance I'm missing a subtlety there. On Mon, Jul 13, 2015 at 11:55 AM, bliang bli...@thecarousell.com wrote: Hi, I am trying to run the MovieALS example with an implicit dataset and am receiving this error: Got 3856988 ratings from 144250 users on 378937 movies. Training: 3085522, test: 771466. 15/07/13 10:43:07 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/13 10:43:07 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/13 10:43:10 WARN TaskSetManager: Lost task 3.0 in stage 29.0 (TID 192, 10.162.45.33): java.lang.AssertionError: assertion failed: lapack.dppsv returned 1. at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:386) at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1163) at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1124) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/07/13 10:43:10 ERROR TaskSetManager: Task 12 in stage 29.0 failed 4 times; aborting job Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 29.0 failed 4 times, most recent failure: Lost task 12.3 in stage 29.0 (TID 249, 10.162.45.33): java.lang.AssertionError: assertion failed: lapack.dppsv returned 1. at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:386) at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1163) at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1124) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at
Re: How to upgrade Spark version in CDH 5.4
Yeah, it won't technically be supported, and you shouldn't go modifying the actual installation, but if you just make your own build of 1.4 for CDH 5.4 and use that build to launch YARN-based apps, I imagine it will Just Work for most any use case. On Sun, Jul 12, 2015 at 7:34 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: Good question. I'd like to know the same. Although I think you'll loose supportability. -- Ruslan Dautkhanov On Wed, Jul 8, 2015 at 2:03 AM, Ashish Dutt ashish.du...@gmail.com wrote: Hi, I need to upgrade spark version 1.3 to version 1.4 on CDH 5.4. I checked the documentation here but I do not see any thing relevant Any suggestions directing to a solution are welcome. Thanks, Ashish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How can the RegressionMetrics produce negative R2 and explained variance?
In general, R2 means the line that was fit is a very poor fit -- the mean would give a smaller squared error. But it can also mean you are applying R2 where it doesn't apply. Here, you're not performing a linear regression; why are you using R2? On Sun, Jul 12, 2015 at 4:22 PM, afarahat ayman.fara...@yahoo.com wrote: Hello; I am using the ALS recommendation MLLibb. To select the optimal rank, I have a number of users who used multiple items as my test. I then get the prediction on these users and compare it to the observed. I use the RegressionMetrics to estimate the R^2. I keep getting a negative value. r2 = -1.18966999676 explained var = -1.18955347415 count = 11620309 Here is my Pyspark code : train1.cache() test1.cache() numIterations =10 for i in range(10) : rank = int(40+i*10) als = ALS(rank=rank, maxIter=numIterations,implicitPrefs=False) model = als.fit(train1) predobs = model.transform(test1).select(prediction,rating).map(lambda p : (p.prediction,p.rating)).filter(lambda p: (math.isnan(p[0]) == False)) metrics = RegressionMetrics(predobs) mycount = predobs.count() myr2 = metrics.r2 myvar = metrics.explainedVariance print hooo,rank, r2 = ,myr2, explained var = , myvar, count = ,mycount -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-the-RegressionMetrics-produce-negative-R2-and-explained-variance-tp23779.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: foreachRDD vs. forearchPartition ?
These are quite different operations. One operates on RDDs in DStream and one operates on partitions of an RDD. They are not alternatives. On Wed, Jul 8, 2015, 2:43 PM dgoldenberg dgoldenberg...@gmail.com wrote: Is there a set of best practices for when to use foreachPartition vs. foreachRDD? Is it generally true that using foreachPartition avoids some of the over-network data shuffling overhead? When would I definitely want to use one method vs. the other? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/foreachRDD-vs-forearchPartition-tp23714.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: foreachRDD vs. forearchPartition ?
@Evo There is no foreachRDD operation on RDDs; it is a method of DStream. It gives each RDD in the stream. RDD has a foreach, and foreachPartition. These give elements of an RDD. What do you mean it 'works' to call foreachRDD on an RDD? @Dmitry are you asking about foreach vs foreachPartition? that's quite different. foreachPartition does not give more parallelism but lets you operate on a whole batch of data at once, which is nice if you need to allocate some expensive resource to do the processing. On Wed, Jul 8, 2015 at 3:18 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: These are quite different operations. One operates on RDDs in DStream and one operates on partitions of an RDD. They are not alternatives. Sean, different operations as they are, they can certainly be used on the same data set. In that sense, they are alternatives. Code can be written using one or the other which reaches the same effect - likely at a different efficiency cost. The question is, what are the effects of applying one vs. the other? My specific scenario is, I'm streaming data out of Kafka. I want to perform a few transformations then apply an action which results in e.g. writing this data to Solr. According to Evo, my best bet is foreachPartition because of increased parallelism (which I'd need to grok to understand the details of what that means). Another scenario is, I've done a few transformations and send a result somewhere, e.g. I write a message into a socket. Let's say I have one socket per a client of my streaming app and I get a host:port of that socket as part of the message and want to send the response via that socket. Is foreachPartition still a better choice? On Wed, Jul 8, 2015 at 9:51 AM, Sean Owen so...@cloudera.com wrote: These are quite different operations. One operates on RDDs in DStream and one operates on partitions of an RDD. They are not alternatives. On Wed, Jul 8, 2015, 2:43 PM dgoldenberg dgoldenberg...@gmail.com wrote: Is there a set of best practices for when to use foreachPartition vs. foreachRDD? Is it generally true that using foreachPartition avoids some of the over-network data shuffling overhead? When would I definitely want to use one method vs. the other? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/foreachRDD-vs-forearchPartition-tp23714.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Futures timed out after 10000 milliseconds
Usually this message means that the test was starting some process like a Spark master and it didn't ever start. The eventual error is timeout. You have to try to dig in to the test and logs to catch the real reason. On Sun, Jul 5, 2015 at 9:23 PM, SamRoberts samueli.robe...@yahoo.com wrote: Also, it's not clear where to 1 millisec timeout is coming from. Can someone explain -- and if it's a legitimate timeout problem, where would one set this timeout? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Futures-timed-out-after-1-milliseconds-tp23622p23629.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Recent spark sc.textFile needs hadoop for folders?!?
Yes, Spark Core depends on Hadoop libs, and there is this unfortunate twist on Windows. You'll still need HADOOP_HOME set appropriately since Hadoop needs some special binaries to work on Windows. On Fri, Jun 26, 2015 at 11:06 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You just need to set your HADOOP_HOME which appears to be null in the stackstrace. If you are not having the winutils.exe, then you can download and put it there. Thanks Best Regards On Thu, Jun 25, 2015 at 11:30 PM, Ashic Mahtab as...@live.com wrote: Hello, Just trying out spark 1.4 (we're using 1.1 at present). On Windows, I've noticed the following: * On 1.4, sc.textFile(D:\\folder\\).collect() fails from both spark-shell.cmd and when running a scala application referencing the spark-core package from maven. * sc.textFile(D:\\folder\\file.txt).collect() succeeds. * On 1.1, both succeed. * When referencing the binaries in the scala application, this is the error: 15/06/25 18:30:13 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300) at org.apache.hadoop.util.Shell.clinit(Shell.java:293) at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76) at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362) at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:978) at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:978) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176) This seems quite strange...is this a known issue? Worse, is this a feature? I don't have to be using hadoop at all... just want to process some files and data in Cassandra. Regards, Ashic. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: bugs in Spark PageRank implementation
#2 is not a bug. Have a search through JIRA. It is merely unformalized. I think that is how (one of?) the original PageRank papers does it. On Thu, Jun 25, 2015, 7:39 AM Kelly, Terence P (HP Labs Researcher) terence.p.ke...@hp.com wrote: Hi, Colleagues and I have found that the PageRank implementation bundled with Spark is incorrect in several ways. The code in question is in Apache Spark 1.2 distribution's examples directory, called SparkPageRank.scala. Consider the example graph presented in the colorful figure on the Wikipedia page for PageRank; below is an edge list representation, where vertex A is 1, B is 2, etc.: - - - - - begin 2 3 3 2 4 1 4 2 5 2 5 4 5 6 6 2 6 5 7 2 7 5 8 2 8 5 9 2 9 5 10 5 11 5 - - - - - end Here's the output we get from Spark's PageRank after 100 iterations: B has rank: 1.9184837009011475. C has rank: 1.7807113697064196. E has rank: 0.24301279014684984. A has rank: 0.24301279014684984. D has rank: 0.21885362387494078. F has rank: 0.21885362387494078. There are three problems with the output: 1. Only six of the eleven vertices are represented in the output; by definition, PageRank assigns a value to each vertex. 2. The values do not sum to 1.0; by definition, PageRank is a probability vector with one element per vertex and the sum of the elements of the vector must be 1.0. 3. Vertices E and A receive the same PageRank, whereas other means of computing PageRank, e.g., our own homebrew code and the method used by Wikipedia, assign different values to these vertices. Our own code has been compared against the PageRank implementation in the NetworkX package and it agrees. It looks like bug #1 is due to the Spark implementation of PageRank not emitting output for vertices with no incoming edges and bug #3 is due to the code not correctly handling vertices with no outgoing edges. Once #1 and #3 are fixed, normalization might be all that's required to fix #2 (maybe). We currently rely on the Spark PageRank for tests we're conducting; when do you think a fix might be ready? Thanks. -- Terence Kelly, HP Labs - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Compiling Spark 1.4 (and/or Spark 1.4.1-rc1) with CDH 5.4.1/2
to execute goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile (scala-compile-first) on project spark-sql_2.10: Execution scala-compile-first of goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile failed. CompileFailed - [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn goals -rf :spark-sql_2.10 Ahh..ok, so it's Hive 1.1 and Spark 1.4. Even using standard Hive .13 version, I still the the above error. Granted (it's CDH's Hadoop JARs, and Apache's Hive). On Wed, Jun 24, 2015 at 9:30 PM, Sean Owen so...@cloudera.com wrote: You didn't provide any error? You're compiling vs Hive 1.1 here and that is the problem. It is nothing to do with CDH. On Wed, Jun 24, 2015, 10:15 PM Aaron aarongm...@gmail.com wrote: I was curious if any one was able to get CDH 5.4.1 or 5.4.2 compiling with the v1.4.0 tag out of git? SparkSQL keeps dying on me and not 100% why. I modified the pom.xml to mak a simple profile to help: profile idcdh542/id properties java.version1.7/java.version flume.version1.5.0-cdh5.4.2/flume.version hadoop.version2.6.0-cdh5.4.2/hadoop.version yarn.version${hadoop.version}/yarn.version hive.version1.1.0-cdh5.4.2/hive.version hive.version.short1.1.0-cdh5.4.2/hive.version.short hbase.version1.0.0-cdh5.4.2/hbase.version zookeeper.version3.4.5-cdh5.4.2/zookeeper.version avro.version1.7.6-cdh5.4.2/avro.version parquet.version1.5.0-cdh5.4.2/parquet.version /properties modules moduleyarn/module modulenetwork/yarn/module modulesql/hive-thriftserver/module /modules /profile I have tried removing the hive properties, and let it use the default 0.13, but, fails in the same place. mvn clean package -DskipTests -Pcdh542 Using the standard, mvn clean package -Phadoop-2.6 -Pyarn -Phive-thriftserver works great..so, it's got to be something with CDH's JARs..just not sure what. And doing a mvn -X didn't lead me anywherethoughts? help? URLs to read? Thanks in advance. Cheers, Aaron - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem with version compatibility
-dev +user That all sounds fine except are you packaging Spark classes with your app? that's the bit I'm wondering about. You would mark it as a 'provided' dependency in Maven. On Thu, Jun 25, 2015 at 5:12 AM, jimfcarroll jimfcarr...@gmail.com wrote: Hi Sean, I'm running a Mesos cluster. My driver app is built using maven against the maven 1.4.0 dependency. The Mesos slave machines have the spark distribution installed from the distribution link. I have a hard time understanding how this isn't a standard app deployment but maybe I'm missing something. If you build a driver app against 1.4.0 using maven and run it against a mesos cluster that has the 1.4.0 binary distribution installed, your driver wont run right. I meant to publish this question on the user list so my apologies if it's in the wrong place. Jim -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Problem-with-version-compatibility-tp12861p12876.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: map vs mapPartitions
No, or at least, it depends on how the source of the partitions was implemented. On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora shushantaror...@gmail.com wrote: Does mapPartitions keep complete partitions in memory of executor as iterable. JavaRDDString rdd = jsc.textFile(path); JavaRDDInteger output = rdd.mapPartitions(new FlatMapFunctionIteratorString, Integer() { public IterableInteger call(IteratorString input) throws Exception { ListInteger output = new ArrayListInteger(); while(input.hasNext()){ output.add(input.next().length()); } return output; } }); Here does input is present in memory and can contain complete partition of gbs ? Will this function call(IteratorString input) is called only for no of partitions(say if I have 10 in this example) times. Not no of lines times(say 1000) . And whats the use of mapPartitionsWithIndex ? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Velox Model Server
On Wed, Jun 24, 2015 at 12:02 PM, Nick Pentreath nick.pentre...@gmail.com wrote: Oryx does almost the same but Oryx1 kept all user and item vectors in memory (though I am not sure about whether Oryx2 still stores all user and item vectors in memory or partitions in some way). (Yes, this is a weakness, but makes things fast and easy to manage. My rule of thumb is 1M user/item vectors ~= 1GB RAM, comfortably, even with necessary ancillary structures. If you can afford N serving machines with a bunch of RAM, you can get away with this for a long while, but that's an if) Scoring in memory is just the first step if it needs to be real-time -- scoring also probably needs to be even sub-linear in the number of items (i.e. don't even score all items) but this is a tangent relative to the Spark-related question. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Compiling Spark 1.4 (and/or Spark 1.4.1-rc1) with CDH 5.4.1/2
You didn't provide any error? You're compiling vs Hive 1.1 here and that is the problem. It is nothing to do with CDH. On Wed, Jun 24, 2015, 10:15 PM Aaron aarongm...@gmail.com wrote: I was curious if any one was able to get CDH 5.4.1 or 5.4.2 compiling with the v1.4.0 tag out of git? SparkSQL keeps dying on me and not 100% why. I modified the pom.xml to mak a simple profile to help: profile idcdh542/id properties java.version1.7/java.version flume.version1.5.0-cdh5.4.2/flume.version hadoop.version2.6.0-cdh5.4.2/hadoop.version yarn.version${hadoop.version}/yarn.version hive.version1.1.0-cdh5.4.2/hive.version hive.version.short1.1.0-cdh5.4.2/hive.version.short hbase.version1.0.0-cdh5.4.2/hbase.version zookeeper.version3.4.5-cdh5.4.2/zookeeper.version avro.version1.7.6-cdh5.4.2/avro.version parquet.version1.5.0-cdh5.4.2/parquet.version /properties modules moduleyarn/module modulenetwork/yarn/module modulesql/hive-thriftserver/module /modules /profile I have tried removing the hive properties, and let it use the default 0.13, but, fails in the same place. mvn clean package -DskipTests -Pcdh542 Using the standard, mvn clean package -Phadoop-2.6 -Pyarn -Phive-thriftserver works great..so, it's got to be something with CDH's JARs..just not sure what. And doing a mvn -X didn't lead me anywherethoughts? help? URLs to read? Thanks in advance. Cheers, Aaron
Re: Velox Model Server
Yes, and typically needs are 100ms. Now imagine even 10 concurrent requests. My experience has been that this approach won't nearly scale. The best you could probably do is async mini-batch near-real-time scoring, pushing results to some store for retrieval, which could be entirely suitable for your use case. On Tue, Jun 23, 2015 at 8:52 AM, Nick Pentreath nick.pentre...@gmail.com wrote: If your recommendation needs are real-time (1s) I am not sure job server and computing the refs with spark will do the trick (though those new BLAS-based methods may have given sufficient speed up). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Velox Model Server
Out of curiosity why netty? What model are you serving? Velox doesn't look like it is optimized for cases like ALS recs, if that's what you mean. I think scoring ALS at scale in real time takes a fairly different approach. The servlet engine probably doesn't matter at all in comparison. On Sat, Jun 20, 2015, 9:40 PM Debasish Das debasish.da...@gmail.com wrote: After getting used to Scala, writing Java is too much work :-) I am looking for scala based project that's using netty at its core (spray is one example). prediction.io is an option but that also looks quite complicated and not using all the ML features that got added in 1.3/1.4 Velox built on top of ML / Keystone ML pipeline API and that's useful but it is still using javax servlets which is not netty based. On Sat, Jun 20, 2015 at 10:25 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Oops, that link was for Oryx 1. Here's the repo for Oryx 2: https://github.com/OryxProject/oryx On Sat, Jun 20, 2015 at 10:20 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Debasish, The Oryx project (https://github.com/cloudera/oryx), which is Apache 2 licensed, contains a model server that can serve models built with MLlib. -Sandy On Sat, Jun 20, 2015 at 8:00 AM, Charles Earl charles.ce...@gmail.com wrote: Is velox NOT open source? On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com wrote: Hi, The demo of end-to-end ML pipeline including the model server component at Spark Summit was really cool. I was wondering if the Model Server component is based upon Velox or it uses a completely different architecture. https://github.com/amplab/velox-modelserver We are looking for an open source version of model server to build upon. Thanks. Deb -- - Charles
Re: [Spark-1.4.0]jackson-databind conflict?
I see the same thing in an app that uses Jackson 2.5. Downgrading to 2.4 made it work. I meant to go back and figure out if there's something that can be done to work around this in Spark or elsewhere, but for now, harmonize your Jackson version at 2.4.x if you can. On Fri, Jun 12, 2015 at 4:20 PM, Earthson earthson...@gmail.com wrote: I'm using Play-2.4 with play-json-2.4, It works fine with spark-1.3.1, but it failed after I upgrade Spark to spark-1.4.0:( sc.parallelize(1 to 1).count code [info] com.fasterxml.jackson.databind.JsonMappingException: Could not find creator property with name 'id' (in class org.apache.spark.rdd.RDDOperationScope) [info] at [Source: {id:0,name:parallelize}; line: 1, column: 1] [info] at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148) [info] at com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220) [info] at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143) [info] at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409) [info] at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358) [info] at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265) [info] at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245) [info] at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143) /code -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-jackson-databind-conflict-tp23295.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Java API and minimum set of 3rd party dependencies
You don't add dependencies to your app -- you mark Spark as 'provided' in the build and you rely on the deployed Spark environment to provide it. On Fri, Jun 12, 2015 at 7:14 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Hi all, We want to integrate Spark in our Java application using the Spark Java Api and run then on the Yarn clusters. If i want to run Spark on Yarn, which dependencies are must for including ? I looked at Spark POM which lists that Spark requires 50+ 3rd party dependencies. Is there minimum set of Spark dependencies which are necessary for Spark Java API (for Spark client run on Yarn cluster) ? Thanks in advance. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark distinct() returns incorrect results for some types?
Guess: it has something to do with the Text object being reused by Hadoop? You can't in general keep around refs to them since they change. So you may have a bunch of copies of one object at the end that become just one in each partition. On Thu, Jun 11, 2015, 8:36 PM Crystal Xing crystalxin...@gmail.com wrote: I load a list of ids from a text file as NLineInputFormat, and when I do distinct(), it returns incorrect number. JavaRDDText idListData = jvc .hadoopFile(idList, NLineInputFormat.class, LongWritable.class, Text.class).values().distinct() I should have 7000K distinct value, how every it only returns 7000 values, which is the same as number of tasks. The type I am using is import org.apache.hadoop.io.Text; However, if I switch to use String instead of Text, it works correcly. I think the Text class should have correct implementation of equals() and hashCode() functions since it is the hadoop class. Does anyone have clue what is going on? I am using spark 1.2. Zheng zheng
Re: Spark distinct() returns incorrect results for some types?
Yep you need to use a transformation of the raw value; use toString for example. On Thu, Jun 11, 2015, 8:54 PM Crystal Xing crystalxin...@gmail.com wrote: That is a little scary. So you mean in general, we shouldn't use hadoop's writable as Key in RDD? Zheng zheng On Thu, Jun 11, 2015 at 6:44 PM, Sean Owen so...@cloudera.com wrote: Guess: it has something to do with the Text object being reused by Hadoop? You can't in general keep around refs to them since they change. So you may have a bunch of copies of one object at the end that become just one in each partition. On Thu, Jun 11, 2015, 8:36 PM Crystal Xing crystalxin...@gmail.com wrote: I load a list of ids from a text file as NLineInputFormat, and when I do distinct(), it returns incorrect number. JavaRDDText idListData = jvc .hadoopFile(idList, NLineInputFormat.class, LongWritable.class, Text.class).values().distinct() I should have 7000K distinct value, how every it only returns 7000 values, which is the same as number of tasks. The type I am using is import org.apache.hadoop.io.Text; However, if I switch to use String instead of Text, it works correcly. I think the Text class should have correct implementation of equals() and hashCode() functions since it is the hadoop class. Does anyone have clue what is going on? I am using spark 1.2. Zheng zheng
Re: Split RDD based on criteria
No, but you can write a couple lines of code that do this. It's not optimized of course. This is actually a long and interesting side discussion, but I'm not sure how much it could be given that the computation is pull rather than push; there is no concept of one pass over the data resulting in many RDDs. However you can cache / persist the source RDD to at least make sure it is not recomputed. I don't think groupByKey is quite a solution since it means one RDD for which all values for one key must fit in memory, and because the desired output is an RDD, I am not sure that is suitable. On Wed, Jun 10, 2015 at 1:56 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I'm gathering that the typical approach for splitting an RDD is to apply several filters to it. rdd1 = rdd.filter(func1); rdd2 = rdd.filter(func2); ... Is there/should there be a way to create 'buckets' like these in one go? ListRDD rddList = rdd.filter(func1, func2, ..., funcN) Another angle here is, when applying a filter(func), is there a way to get two RDD's back, one for which func returned true for all elements of the original RDD (the one being filtered), and the other one for which func returned false for all the elements? PairRDD pair = rdd.filterTrueFalse(func); Right now I'm doing RDD x = rdd.filter(func); RDD y = rdd.filter(reverseOfFunc); This seems a bit tautological to me, though Spark must be optimizing this out (?) Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Split-RDD-based-on-criteria-tp23254.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Filter operation to return two RDDs at once.
In the sense here, Spark actually does have operations that make multiple RDDs like randomSplit. However there is not an equivalent of the partition operation which gives the elements that matched and did not match at once. On Wed, Jun 3, 2015, 8:32 AM Jeff Zhang zjf...@gmail.com wrote: As far as I know, spark don't support multiple outputs On Wed, Jun 3, 2015 at 2:15 PM, ayan guha guha.a...@gmail.com wrote: Why do you need to do that if filter and content of the resulting rdd are exactly same? You may as well declare them as 1 RDD. On 3 Jun 2015 15:28, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I want to do this val qtSessionsWithQt = rawQtSession.filter(_._2.qualifiedTreatmentId != NULL_VALUE) val guidUidMapSessions = rawQtSession.filter(_._2. qualifiedTreatmentId == NULL_VALUE) This will run two different stages can this be done in one stage ? val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession. *magicFilter*(_._2.qualifiedTreatmentId != NULL_VALUE) -- Deepak -- Best Regards Jeff Zhang
Re: Example Page Java Function2
Yes, I think you're right. Since this is a change to the ASF hosted site, I can make this change to the .md / .html directly rather than go through the usual PR. On Wed, Jun 3, 2015 at 6:23 PM, linkstar350 . tweicomepan...@gmail.com wrote: Hi, I'm Taira. I notice that this example page may be a mistake. https://spark.apache.org/examples.html Word Count (Java) JavaRDDString textFile = spark.textFile(hdfs://...); JavaRDDString words = textFile.flatMap(new FlatMapFunctionString, String() { public IterableString call(String s) { return Arrays.asList(s.split( )); } }); JavaPairRDDString, Integer pairs = words.mapToPair(new PairFunctionString, String, Integer() { public Tuple2String, Integer call(String s) { return new Tuple2String, Integer(s, 1); } }); JavaPairRDDString, Integer counts = pairs.reduceByKey(new Function2Integer, Integer() { public Integer call(Integer a, Integer b) { return a + b; } }); counts.saveAsTextFile(hdfs://...); Function2 should have three generic type arguments, but there are only two. I hope for your consideration. Taira - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: spark 1.3.1 jars in repo1.maven.org
We are having a separate discussion about this but, I don't understand why this is a problem? You're supposed to build Spark for Hadoop 1 if you run it on Hadoop 1 and I am not sure that is happening here, given the error. I do not think this should change as I do not see that it fixes something. Let's please concentrate the follow up on the JIRA since you already made one. On Wed, Jun 3, 2015 at 2:26 AM, Shixiong Zhu zsxw...@gmail.com wrote: Ryan - I sent a PR to fix your issue: https://github.com/apache/spark/pull/6599 Edward - I have no idea why the following error happened. ContextCleaner doesn't use any Hadoop API. Could you try Spark 1.3.0? It's supposed to support both hadoop 1 and hadoop 2. * Exception in thread Spark Context Cleaner java.lang.NoClassDefFoundError: 0 at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149) Best Regards, Shixiong Zhu 2015-06-03 0:08 GMT+08:00 Ryan Williams ryan.blake.willi...@gmail.com: I think this is causing issues upgrading ADAM https://github.com/bigdatagenomics/adam to Spark 1.3.1 (cf. adam#690 https://github.com/bigdatagenomics/adam/pull/690#issuecomment-107769383); attempting to build against Hadoop 1.0.4 yields errors like: 2015-06-02 15:57:44 ERROR Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) *java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected* at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95) at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-06-02 15:57:44 WARN TaskSetManager:71 - Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95) at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) TaskAttemptContext is a class in Hadoop 1.0.4, but an interface in Hadoop 2; Spark 1.3.1 expects the interface but is getting the class. It sounds like, while I *can* depend on Spark 1.3.1 and Hadoop 1.0.4, I then need to hope that I don't exercise certain Spark code paths that run afoul of differences between Hadoop 1 and 2; does that seem correct? Thanks! On Wed, May 20, 2015 at 1:52 PM Sean Owen so...@cloudera.com wrote: I don't think any of those problems are related to Hadoop. Have you looked at userClassPathFirst settings? On Wed, May 20, 2015 at 6:46 PM, Edward Sargisson ejsa...@gmail.com wrote: Hi Sean and Ted, Thanks for your replies. I don't have our current problems nicely written up as good questions yet. I'm still sorting out classpath issues, etc. In case it is of help, I'm seeing: * Exception in thread Spark Context Cleaner java.lang.NoClassDefFoundError: 0 at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149) * We've been having clashing dependencies between a colleague and I because of the aforementioned classpath issue * The clashing dependencies are also causing issues with what jetty libraries are available in the classloader from Spark and don't clash with existing libraries we have. More anon, Cheers, Edward Original Message Subject: Re: spark 1.3.1 jars in repo1.maven.org Date: 2015-05-20 00:38 From: Sean Owen so...@cloudera.com To: Edward Sargisson esa...@pobox.com Cc: user user@spark.apache.org Yes, the published artifacts can only refer to one version of anything (OK, modulo publishing a large number of variants under classifiers
Re: rdd.sample() methods very slow
I guess the fundamental issue is that these aren't stored in a way that allows random access to a Document. Underneath, Hadoop has a concept of a MapFile which is like a SequenceFile with an index of offsets into the file where records being. Although Spark doesn't use it, you could maybe create some custom RDD that takes advantage of this format to grab random elements efficiently. Other things come to mind but I think they're all slower -- like hashing all the docs and taking the smallest n in each of k partitions to get a pretty uniform random sample of kn docs. On Thu, May 21, 2015 at 4:04 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Is there any other way to solve the problem? Let me state the use case I have an RDD[Document] contains over 7 millions items. The RDD need to be save on a persistent storage (currently I save it as object file on disk). Then I need to get a small random sample of Document objects (e.g. 10,000 document). How can I do this quickly? The rdd.sample() methods does not help because it need to read the entire RDD of 7 million Document from disk which take very long time. Ningjun From: Sean Owen [mailto:so...@cloudera.com] Sent: Tuesday, May 19, 2015 4:51 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: rdd.sample() methods very slow The way these files are accessed is inherently sequential-access. There isn't a way to in general know where record N is in a file like this and jump to it. So they must be read to be sampled. On Tue, May 19, 2015 at 9:44 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Hi I have an RDD[Document] that contains 7 million objects and it is saved in file system as object file. I want to get a random sample of about 70 objects from it using rdd.sample() method. It is ver slow val rdd : RDD[Document] = sc.objectFile[Document](C:/temp/docs.obj).sample(false, 0.1D, 0L).cache() val count = rdd.count() From Spark UI, I see spark is try to read the entire object files at the folder “C:/temp/docs.obj” which is about 29.7 GB. Of course this is very slow. Why does Spark try to read entire 7 million objects while I only need to return a random sample of 70 objects? Is there any efficient way to get a random sample of 70 objects without reading through the entire object files? Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: rdd.sample() methods very slow
If sampling whole partitions is sufficient (or a part of a partition), sure you could mapPartitionsWithIndex and decide whether to process a partition at all based on its # and skip the rest. That's much faster. On Thu, May 21, 2015 at 7:07 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I don't need to be 100% randome. How about randomly pick a few partitions and return all docs in those partitions? Is rdd.mapPartitionsWithIndex() the right method to use to just process a small portion of partitions? Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Hive on Spark VS Spark SQL
I don't think that's quite the difference. Any SQL engine has a query planner and an execution engine. Both of these Spark for execution. HoS uses Hive for query planning. Although it's not optimized for execution on Spark per se, it's got a lot of language support and is stable/mature. Spark SQL's query planner is less developed at this point but purpose-built for Spark as an execution engine. Spark SQL is also how you put SQL-like operations in a Spark program -- programmatic SQL if you will -- which isn't what Hive or therefore HoS does. HoS is good if you're already using Hive and need its language features and need it as it works today, and want a faster batch execution version of it. On Wed, May 20, 2015 at 7:18 AM, Debasish Das debasish.da...@gmail.com wrote: SparkSQL was built to improve upon Hive on Spark runtime further... On Tue, May 19, 2015 at 10:37 PM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hive on Spark and SparkSQL which should be better , and what are the key characteristics and the advantages and the disadvantages between ? -- guoqing0...@yahoo.com.hk
Re: spark 1.3.1 jars in repo1.maven.org
Yes, the published artifacts can only refer to one version of anything (OK, modulo publishing a large number of variants under classifiers). You aren't intended to rely on Spark's transitive dependencies for anything. Compiling against the Spark API has no relation to what version of Hadoop it binds against because it's not part of any API. You mark the Spark dependency even as provided in your build and get all the Spark/Hadoop bindings at runtime from our cluster. What problem are you experiencing? On Wed, May 20, 2015 at 2:17 AM, Edward Sargisson esa...@pobox.com wrote: Hi, I'd like to confirm an observation I've just made. Specifically that spark is only available in repo1.maven.org for one Hadoop variant. The Spark source can be compiled against a number of different Hadoops using profiles. Yay. However, the spark jars in repo1.maven.org appear to be compiled against one specific Hadoop and no other differentiation is made. (I can see a difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in the version I compiled locally). The implication here is that if you have a pom file asking for spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2 version. Maven assumes that non-snapshot artifacts never change so trying to load an Hadoop 1 version will end in tears. This then means that if you compile code against spark-core then there will probably be classpath NoClassDefFound issues unless the Hadoop 2 version is exactly the one you want. Have I gotten this correct? It happens that our little app is using a Spark context directly from a Jetty webapp and the classpath differences were/are causing some confusion. We are currently installing a Hadoop 1 spark master and worker. Thanks a lot! Edward - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: spark 1.3.1 jars in repo1.maven.org
I don't think any of those problems are related to Hadoop. Have you looked at userClassPathFirst settings? On Wed, May 20, 2015 at 6:46 PM, Edward Sargisson ejsa...@gmail.com wrote: Hi Sean and Ted, Thanks for your replies. I don't have our current problems nicely written up as good questions yet. I'm still sorting out classpath issues, etc. In case it is of help, I'm seeing: * Exception in thread Spark Context Cleaner java.lang.NoClassDefFoundError: 0 at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149) * We've been having clashing dependencies between a colleague and I because of the aforementioned classpath issue * The clashing dependencies are also causing issues with what jetty libraries are available in the classloader from Spark and don't clash with existing libraries we have. More anon, Cheers, Edward Original Message Subject: Re: spark 1.3.1 jars in repo1.maven.org Date: 2015-05-20 00:38 From: Sean Owen so...@cloudera.com To: Edward Sargisson esa...@pobox.com Cc: user user@spark.apache.org Yes, the published artifacts can only refer to one version of anything (OK, modulo publishing a large number of variants under classifiers). You aren't intended to rely on Spark's transitive dependencies for anything. Compiling against the Spark API has no relation to what version of Hadoop it binds against because it's not part of any API. You mark the Spark dependency even as provided in your build and get all the Spark/Hadoop bindings at runtime from our cluster. What problem are you experiencing? On Wed, May 20, 2015 at 2:17 AM, Edward Sargisson esa...@pobox.com wrote: Hi, I'd like to confirm an observation I've just made. Specifically that spark is only available in repo1.maven.org for one Hadoop variant. The Spark source can be compiled against a number of different Hadoops using profiles. Yay. However, the spark jars in repo1.maven.org appear to be compiled against one specific Hadoop and no other differentiation is made. (I can see a difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in the version I compiled locally). The implication here is that if you have a pom file asking for spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2 version. Maven assumes that non-snapshot artifacts never change so trying to load an Hadoop 1 version will end in tears. This then means that if you compile code against spark-core then there will probably be classpath NoClassDefFound issues unless the Hadoop 2 version is exactly the one you want. Have I gotten this correct? It happens that our little app is using a Spark context directly from a Jetty webapp and the classpath differences were/are causing some confusion. We are currently installing a Hadoop 1 spark master and worker. Thanks a lot! Edward
Re: Spark Streaming graceful shutdown in Spark 1.4
I don't think you should rely on a shutdown hook. Ideally you try to stop it in the main exit path of your program, even in case of an exception. On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: You mean to say within Runtime.getRuntime().addShutdownHook I call ssc.stop(stopSparkContext = true, stopGracefully = true) ? This won't work anymore in 1.4. The SparkContext got stopped before Receiver processed all received blocks and I see below exception in logs. But if I add the Utils.addShutdownHook with the priority as I mentioned , then only graceful shutdown works . In that case shutdown-hook run in priority order. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: rdd.sample() methods very slow
The way these files are accessed is inherently sequential-access. There isn't a way to in general know where record N is in a file like this and jump to it. So they must be read to be sampled. On Tue, May 19, 2015 at 9:44 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Hi I have an RDD[Document] that contains 7 million objects and it is saved in file system as object file. I want to get a random sample of about 70 objects from it using rdd.sample() method. It is ver slow val rdd : RDD[Document] = sc.objectFile[Document](C:/temp/docs.obj).sample(false, 0.1D, 0L).cache() val count = rdd.count() From Spark UI, I see spark is try to read the entire object files at the folder “C:/temp/docs.obj” which is about 29.7 GB. Of course this is very slow. Why does Spark try to read entire 7 million objects while I only need to return a random sample of 70 objects? Is there any efficient way to get a random sample of 70 objects without reading through the entire object files? Ningjun
Re: SPARK-4412 regressed?
(I made you a Contributor in JIRA -- your yahoo-related account of the two -- so maybe that will let you do so.) On Fri, May 15, 2015 at 4:19 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi, two questions 1. Can regular JIRA users reopen bugs -- I can open a new issue but it does not appear that I can reopen issues. What is the proper protocol to follow if we discover regressions? 2. I believe SPARK-4412 regressed in Spark 1.3.1, according to this SO thread possibly even in 1.3.0 http://stackoverflow.com/questions/30052889/how-to-suppress-parquet-log-messages-in-spark - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Build change PSA: Hadoop 2.2 default; -Phadoop-x.y profile recommended for builds
This change will be merged shortly for Spark 1.4, and has a minor implication for those creating their own Spark builds: https://issues.apache.org/jira/browse/SPARK-7249 https://github.com/apache/spark/pull/5786 The default Hadoop dependency has actually been Hadoop 2.2 for some time, but the defaults weren't fully consistent as a Hadoop 2.2 build. That is what this resolves. The discussion highlights that it's actually not great to rely on the Hadoop defaults, if you care at all about the Hadoop binding, and that it's good practice to set some -Phadoop-x.y profile in any build. The net changes are: If you don't care about Hadoop at all, you could ignore this. You will get a consistent Hadoop 2.2 binding by default now. Still, you may wish to set a Hadoop profile. If you build for Hadoop 1, you need to set -Phadoop-1 now. If you build for Hadoop 2.2, you should still set -Phadoop-2.2 even though this is the default and is a no-op profile now. You can continue to set other Hadoop profiles and override hadoop.version; these are unaffected. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
The question is really whether all the third-party integrations should be built into Spark's main assembly. I think reasonable people could disagree, but I think the current state (not built in) is reasonable. It means you have to bring the integration with you. That is, no, third-party queue integrations aren't built in out of the box. the way you got it to work is one way, but not the preferred way: build this into your app and your packaging tool would have resolved the dependencies. I agree with resolving this as basically working-as-intended. On Tue, May 12, 2015 at 3:19 AM, Lee McFadden splee...@gmail.com wrote: I opened a ticket on this (without posting here first - bad etiquette, apologies) which was closed as 'fixed'. https://issues.apache.org/jira/browse/SPARK-7538 I don't believe that because I have my script running means this is fixed, I think it is still an issue. I downloaded the spark source, ran `mvn -DskipTests clean package `, then simply launched my python script (which shouldn't be introducing additional *java* dependencies itself?). Doesn't this mean these dependencies are missing from the spark build, since I didn't modify any files within the distribution and my application itself can't be introducing java dependency clashes? On Mon, May 11, 2015, 4:34 PM Lee McFadden splee...@gmail.com wrote: Ted, many thanks. I'm not used to Java dependencies so this was a real head-scratcher for me. Downloading the two metrics packages from the maven repository (metrics-core, metrics-annotation) and supplying it on the spark-submit command line worked. My final spark-submit for a python project using Kafka as an input source: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --jars /home/ubuntu/jars/metrics-core-2.2.0.jar,/home/ubuntu/jars/metrics-annotation-2.2.0.jar \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py Now we're seeing data from the stream. Thanks again! On Mon, May 11, 2015 at 2:43 PM Sean Owen so...@cloudera.com wrote: Ah yes, the Kafka + streaming code isn't in the assembly, is it? you'd have to provide it and all its dependencies with your app. You could also build this into your own app jar. Tools like Maven will add in the transitive dependencies. On Mon, May 11, 2015 at 10:04 PM, Lee McFadden splee...@gmail.com wrote: Thanks Ted, The issue is that I'm using packages (see spark-submit definition) and I do not know how to add com.yammer.metrics:metrics-core to my classpath so Spark can see it. Should metrics-core not be part of the org.apache.spark:spark-streaming-kafka_2.10:1.3.1 package so it can work correctly? If not, any clues as to how I can add metrics-core to my project (bearing in mind that I'm using Python, not a JVM language) would be much appreciated. Thanks, and apologies for my newbness with Java/Scala. On Mon, May 11, 2015 at 1:42 PM Ted Yu yuzhih...@gmail.com wrote: com.yammer.metrics.core.Gauge is in metrics-core jar e.g., in master branch: [INFO] | \- org.apache.kafka:kafka_2.10:jar:0.8.1.1:compile [INFO] | +- com.yammer.metrics:metrics-core:jar:2.2.0:compile Please make sure metrics-core jar is on the classpath. On Mon, May 11, 2015 at 1:32 PM, Lee McFadden splee...@gmail.com wrote: Hi, We've been having some issues getting spark streaming running correctly using a Kafka stream, and we've been going around in circles trying to resolve this dependency. Details of our environment and the error below, if anyone can help resolve this it would be much appreciated. Submit command line: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py When we run the streaming job everything starts just fine, then we see the following in the logs: 15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70, ip-10-10-102-53.us-west-2.compute.internal): java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
I think Java-land users will understand to look for an assembly jar in general, but it's not as obvious outside the Java ecosystem. Assembly = this thing, plus all its transitive dependencies. No, there is nothing wrong with Kafka at all. You need to bring everything it needs for it to work at runtime. The only piece of Spark you commonly need to bring with you are the third-party streaming deps, and I agree that the docs should tell Python users to attach the assembly JAR. Java/Scala users would be better served building this into their app I think where they would already be making an assembly JAR. On Tue, May 12, 2015 at 5:39 PM, Lee McFadden splee...@gmail.com wrote: Thanks again for all the help folks. I can confirm that simply switching to `--packages org.apache.spark:spark-streaming-kafka-assembly_2.10:1.3.1` makes everything work as intended. I'm not sure what the difference is between the two packages honestly, or why one should be used over the other, but the documentation is currently not intuitive in this matter. If you follow the instructions, initially it will seem broken. Is there any reason why the docs for Python users (or, in fact, all users - Java/Scala users will run into this too except they are armed with the ability to build their own jar with the dependencies included) should not be changed to using the assembly package by default? Additionally, after a few google searches yesterday combined with your help I'm wondering if the core issue is upstream in Kafka's dependency chain? On Tue, May 12, 2015 at 8:53 AM Ted Yu yuzhih...@gmail.com wrote: bq. it is already in the assembly Yes. Verified: $ jar tvf ~/Downloads/spark-streaming-kafka-assembly_2.10-1.3.1.jar | grep yammer | grep Gauge 1329 Sat Apr 11 04:25:50 PDT 2015 com/yammer/metrics/core/Gauge.class On Tue, May 12, 2015 at 8:05 AM, Sean Owen so...@cloudera.com wrote: It doesn't depend directly on yammer metrics; Kafka does. It wouldn't be correct to declare that it does; it is already in the assembly anyway. On Tue, May 12, 2015 at 3:50 PM, Ted Yu yuzhih...@gmail.com wrote: Currently external/kafka/pom.xml doesn't cite yammer metrics as dependency. $ ls -l ~/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar -rw-r--r-- 1 tyu staff 82123 Dec 17 2013 /Users/tyu/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar Including the metrics-core jar would not increase the size of the final release artifact much. My two cents. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
Yeah, fair point about Python. spark-streaming-kafka should not contain third-party dependencies. However there's nothing stopping the build from producing an assembly jar from these modules. I think there is an assembly target already though? On Tue, May 12, 2015 at 3:37 PM, Lee McFadden splee...@gmail.com wrote: Sorry to flog this dead horse, but this is something every python user is going to run into as we *cannot* build the dependencies onto our app. There is no way to do that with a python script. As I see it, this is not a third party integration. The package missing its dependencies is built by the spark team I believe? org.apache.spark:spark-streaming-kafka_2.10:1.3.1 is the problem package, if I remove the cassandra package I still run into the same error. If there really is no way to add these dependencies to the kafka package? I tried to add these dependencies in a number of ways but the maze of pom.xml files makes that difficult for those not familiar with java. Thanks again. I really don't want other python users to run into the same brick wall I did as I nearly gave up on Spark over what turns out to be a relatively simple thing. On Tue, May 12, 2015, 1:11 AM Sean Owen so...@cloudera.com wrote: The question is really whether all the third-party integrations should be built into Spark's main assembly. I think reasonable people could disagree, but I think the current state (not built in) is reasonable. It means you have to bring the integration with you. That is, no, third-party queue integrations aren't built in out of the box. the way you got it to work is one way, but not the preferred way: build this into your app and your packaging tool would have resolved the dependencies. I agree with resolving this as basically working-as-intended. On Tue, May 12, 2015 at 3:19 AM, Lee McFadden splee...@gmail.com wrote: I opened a ticket on this (without posting here first - bad etiquette, apologies) which was closed as 'fixed'. https://issues.apache.org/jira/browse/SPARK-7538 I don't believe that because I have my script running means this is fixed, I think it is still an issue. I downloaded the spark source, ran `mvn -DskipTests clean package `, then simply launched my python script (which shouldn't be introducing additional *java* dependencies itself?). Doesn't this mean these dependencies are missing from the spark build, since I didn't modify any files within the distribution and my application itself can't be introducing java dependency clashes? On Mon, May 11, 2015, 4:34 PM Lee McFadden splee...@gmail.com wrote: Ted, many thanks. I'm not used to Java dependencies so this was a real head-scratcher for me. Downloading the two metrics packages from the maven repository (metrics-core, metrics-annotation) and supplying it on the spark-submit command line worked. My final spark-submit for a python project using Kafka as an input source: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --jars /home/ubuntu/jars/metrics-core-2.2.0.jar,/home/ubuntu/jars/metrics-annotation-2.2.0.jar \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py Now we're seeing data from the stream. Thanks again! On Mon, May 11, 2015 at 2:43 PM Sean Owen so...@cloudera.com wrote: Ah yes, the Kafka + streaming code isn't in the assembly, is it? you'd have to provide it and all its dependencies with your app. You could also build this into your own app jar. Tools like Maven will add in the transitive dependencies. On Mon, May 11, 2015 at 10:04 PM, Lee McFadden splee...@gmail.com wrote: Thanks Ted, The issue is that I'm using packages (see spark-submit definition) and I do not know how to add com.yammer.metrics:metrics-core to my classpath so Spark can see it. Should metrics-core not be part of the org.apache.spark:spark-streaming-kafka_2.10:1.3.1 package so it can work correctly? If not, any clues as to how I can add metrics-core to my project (bearing in mind that I'm using Python, not a JVM language) would be much appreciated. Thanks, and apologies for my newbness with Java/Scala. On Mon, May 11, 2015 at 1:42 PM Ted Yu yuzhih...@gmail.com wrote: com.yammer.metrics.core.Gauge is in metrics-core jar e.g., in master branch: [INFO] | \- org.apache.kafka:kafka_2.10:jar:0.8.1.1:compile [INFO] | +- com.yammer.metrics:metrics-core:jar:2.2.0:compile Please make sure metrics-core jar is on the classpath. On Mon, May 11, 2015 at 1:32 PM, Lee McFadden splee...@gmail.com wrote: Hi, We've been having some issues getting spark streaming running correctly using a Kafka
Re: Running Spark in local mode seems to ignore local[N]
BTW I think my comment was wrong as marcelo demonstrated. In standalone mode you'd have one worker, and you do have one executor, but his explanation is right. But, you certainly have execution slots for each core. Are you talking about your own user code? you can make threads, but that's nothing do with Spark then. If you run code on your driver, it's not distributed. If you run Spark over an RDD with 1 partition, only one task works on it. On Mon, May 11, 2015 at 10:16 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Sean, How does this model actually work? Let's say we want to run one job as N threads executing one particular task, e.g. streaming data out of Kafka into a search engine. How do we configure our Spark job execution? Right now, I'm seeing this job running as a single thread. And it's quite a bit slower than just running a simple utility with a thread executor with a thread pool of N threads doing the same task. The performance I'm seeing of running the Kafka-Spark Streaming job is 7 times slower than that of the utility. What's pulling Spark back? Thanks. On Mon, May 11, 2015 at 4:55 PM, Sean Owen so...@cloudera.com wrote: You have one worker with one executor with 32 execution slots. On Mon, May 11, 2015 at 9:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Getting error running MLlib example with new cluster
That is mostly the YARN overhead. You're starting up a container for the AM and executors, at least. That still sounds pretty slow, but the defaults aren't tuned for fast startup. On May 11, 2015 7:00 PM, Su She suhsheka...@gmail.com wrote: Got it to work on the cluster by changing the master to yarn-cluster instead of local! I do have a couple follow up questions... This is the example I was trying to run: https://github.com/holdenk/learning-spark-examples/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala 1) The example still takes about 1 min 15 seconds to run (my cluster has 3 m3.large nodes). This seems really long for building a model based off data that is about 10 lines long. Is this normal? 2) Any guesses as to why it was able to run in the cluster, but not locally? Thanks for the help! On Mon, Apr 27, 2015 at 11:48 AM, Su She suhsheka...@gmail.com wrote: Hello Xiangrui, I am using this spark-submit command (as I do for all other jobs): /opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/spark/bin/spark-submit --class MLlib --master local[2] --jars $(echo /home/ec2-user/sparkApps/learning-spark/lib/*.jar | tr ' ' ',') /home/ec2-user/sparkApps/learning-spark/target/simple-project-1.1.jar Thank you for the help! Best, Su On Mon, Apr 27, 2015 at 9:58 AM, Xiangrui Meng men...@gmail.com wrote: How did you run the example app? Did you use spark-submit? -Xiangrui On Thu, Apr 23, 2015 at 2:27 PM, Su She suhsheka...@gmail.com wrote: Sorry, accidentally sent the last email before finishing. I had asked this question before, but wanted to ask again as I think it is now related to my pom file or project setup. Really appreciate the help! I have been trying on/off for the past month to try to run this MLlib example: https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala I am able to build the project successfully. When I run it, it returns: features in spam: 8 features in ham: 7 and then freezes. According to the UI, the description of the job is count at DataValidators.scala.38. This corresponds to this line in the code: val model = lrLearner.run(trainingData) I've tried just about everything I can think of...changed numFeatures from 1 - 10,000, set executor memory to 1g, set up a new cluster, at this point I think I might have missed dependencies as that has usually been the problem in other spark apps I have tried to run. This is my pom file, that I have used for other successful spark apps. Please let me know if you think I need any additional dependencies or there are incompatibility issues, or a pom.xml that is better to use. Thank you! Cluster information: Spark version: 1.2.0-SNAPSHOT (in my older cluster it is 1.2.0) java version 1.7.0_25 Scala version: 2.10.4 hadoop version: hadoop 2.5.0-cdh5.3.3 (older cluster was 5.3.0) project xmlns = http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://w3.org/2001/XMLSchema-instance; xsi:schemaLocation =http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd; groupId edu.berkely/groupId artifactId simple-project /artifactId modelVersion 4.0.0/modelVersion name Simple Project /name packaging jar /packaging version 1.0 /version repositories repository idcloudera/id url http://repository.cloudera.com/artifactory/cloudera-repos//url /repository repository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttp://scala-tools.org/repo-releases/url /repository /repositories pluginRepositories pluginRepository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttp://scala-tools.org/repo-releases/url /pluginRepository /pluginRepositories build plugins plugin groupIdorg.scala-tools/groupId artifactIdmaven-scala-plugin/artifactId executions execution idcompile/id goals goalcompile/goal /goals phasecompile/phase /execution execution idtest-compile/id goals goaltestCompile/goal /goals phasetest-compile/phase
Re: Running Spark in local mode seems to ignore local[N]
You have one worker with one executor with 32 execution slots. On Mon, May 11, 2015 at 9:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
Ah yes, the Kafka + streaming code isn't in the assembly, is it? you'd have to provide it and all its dependencies with your app. You could also build this into your own app jar. Tools like Maven will add in the transitive dependencies. On Mon, May 11, 2015 at 10:04 PM, Lee McFadden splee...@gmail.com wrote: Thanks Ted, The issue is that I'm using packages (see spark-submit definition) and I do not know how to add com.yammer.metrics:metrics-core to my classpath so Spark can see it. Should metrics-core not be part of the org.apache.spark:spark-streaming-kafka_2.10:1.3.1 package so it can work correctly? If not, any clues as to how I can add metrics-core to my project (bearing in mind that I'm using Python, not a JVM language) would be much appreciated. Thanks, and apologies for my newbness with Java/Scala. On Mon, May 11, 2015 at 1:42 PM Ted Yu yuzhih...@gmail.com wrote: com.yammer.metrics.core.Gauge is in metrics-core jar e.g., in master branch: [INFO] | \- org.apache.kafka:kafka_2.10:jar:0.8.1.1:compile [INFO] | +- com.yammer.metrics:metrics-core:jar:2.2.0:compile Please make sure metrics-core jar is on the classpath. On Mon, May 11, 2015 at 1:32 PM, Lee McFadden splee...@gmail.com wrote: Hi, We've been having some issues getting spark streaming running correctly using a Kafka stream, and we've been going around in circles trying to resolve this dependency. Details of our environment and the error below, if anyone can help resolve this it would be much appreciated. Submit command line: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py When we run the streaming job everything starts just fine, then we see the following in the logs: 15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70, ip-10-10-102-53.us-west-2.compute.internal): java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: dependencies on java-netlib and jblas
Yes, at this point I believe you'll find jblas used for historical reasons, to not change some APIs. I don't believe it's used for much if any computation in 1.4. On May 8, 2015 5:04 PM, John Niekrasz john.niekr...@gmail.com wrote: Newbie question... Can I use any of the main ML capabilities of MLlib in a Java-only environment, without any native library dependencies? According to the documentation, java-netlib provides a JVM fallback. This suggests that native netlib libraries are not required. It appears that such a fallback is not available for jblas. However, a quick look at the MLlib source suggests that MLlib's dependencies on jblas are rather isolated: grep -R jblas main/scala/org/apache/spark/ml/recommendation/ALS.scala:import org.jblas.DoubleMatrix main/scala/org/apache/spark/mllib/optimization/NNLS.scala:import org.jblas.{DoubleMatrix, SimpleBlas} main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala:import org.jblas.DoubleMatrix main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala:import org.jblas.DoubleMatrix main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala: org.jblas.util.Random.seed(42) main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala:import org.jblas.DoubleMatrix main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala:import org.jblas.DoubleMatrix Is it true or false that many of MLlib's capabilities will work perfectly fine without any native (non-Java) libraries installed at all? Thanks for the help, John -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/dependencies-on-java-netlib-and-jblas-tp22818.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark does not delete temporary directories
You're referring to a comment in the generic utility method, not the specific calls to it. The comment just says that the generic method doesn't mark the directory for deletion. Individual uses of it might need to. One or more of these might be delete-able on exit, but in any event it's just a directory. I think 'spark files' might intentionally stay around since it outlives one JVM and might be shared across executors. On Fri, May 8, 2015 at 3:53 AM, Taeyun Kim taeyun@innowireless.com wrote: It seems that they are always empty. I've traced the spark source code. The module methods that create the 3 'temp' directories are as follows: - DiskBlockManager.createLocalDirs - HttpFileServer.initialize - SparkEnv.sparkFilesDir They (eventually) call Utils.getOrCreateLocalRootDirs and then Utils.createDirectory, which intentionally does NOT mark the directory for automatic deletion. The comment of createDirectory method says: The directory is guaranteed to be newly created, and is not marked for automatic deletion. I don't know why they are not marked. Is this really intentional? From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Friday, May 08, 2015 11:37 AM To: Taeyun Kim; Ted Yu; Todd Nist; user@spark.apache.org Subject: RE: Spark does not delete temporary directories I think the temporary folders are used to store blocks and shuffles. That doesn't depend on the cluster manager. Ideally they should be removed after the application has been terminated. Can you check if there are contents under those folders? From: Taeyun Kim [mailto:taeyun@innowireless.com] Sent: Friday, May 08, 2015 9:42 AM To: 'Ted Yu'; 'Todd Nist'; user@spark.apache.org Subject: RE: Spark does not delete temporary directories Thanks, but it seems that the option is for Spark standalone mode only. I’ve (lightly) tested the options with local mode and yarn-client mode, the ‘temp’ directories were not deleted. From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Thursday, May 07, 2015 10:47 PM To: Todd Nist Cc: Taeyun Kim; user@spark.apache.org Subject: Re: Spark does not delete temporary directories Default value for spark.worker.cleanup.enabled is false: private val CLEANUP_ENABLED = conf.getBoolean(spark.worker.cleanup.enabled, false) I wonder if the default should be set as true. Cheers On Thu, May 7, 2015 at 6:19 AM, Todd Nist tsind...@gmail.com wrote: Have you tried to set the following? spark.worker.cleanup.enabled=true spark.worker.cleanup.appDataTtl=seconds” On Thu, May 7, 2015 at 2:39 AM, Taeyun Kim taeyun@innowireless.com wrote: Hi, After a spark program completes, there are 3 temporary directories remain in the temp directory. The file names are like this: spark-2e389487-40cc-4a82-a5c7-353c0feefbb7 And the Spark program runs on Windows, a snappy DLL file also remains in the temp directory. The file name is like this: snappy-1.0.4.1-6e117df4-97b6-4d69-bf9d-71c4a627940c-snappyjava They are created every time the Spark program runs. So the number of files and directories keeps growing. How can let them be deleted? Spark version is 1.3.1 with Hadoop 2.6. Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Selecting download for 'hadoop 2.4 and later
See https://issues.apache.org/jira/browse/SPARK-5492 but I think you'll need to share the stack trace as I'm not sure how this can happen since the NoSuchMethodError (not NoSuchMethodException) indicates a call in the bytecode failed to link but there is only a call by reflection. On Fri, May 1, 2015 at 9:30 PM, Stephen Boesch java...@gmail.com wrote: What is the correct procedure for downloading a spark 1.2.X release for use with hadoop2.4? The existing download page has a link for hadoop 2.4+. However when using that with hadoop 2.4 an exception is thrown NoSuchMethodError for Statistics.getThreadStatistics. Upon brief investigation: The SparkHadoopUtil class invokes the hadoop FileSystem.Statistics.getThreadStatistics which exists in hadoop 2.5+ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark pre-built for Hadoop 2.6
Yes there is now such a profile, though it is essentially redundant and doesn't configure things differently from 2.4. Besides hadoop version of course. Which is why it hadn't existed before since the 2.4 profile is 2.4+ People just kept filing bugs to add it but the docs are correct : you don't actually need this profile for 2.6 but could set it if you want. On Apr 30, 2015 11:01 AM, Christophe Préaud christophe.pre...@kelkoo.com wrote: Hi, I can see that there is now a pre-built Spark package for hadoop-2.6: http://apache.mirrors.ovh.net/ftp.apache.org/dist/spark/spark-1.3.1/ Does this mean that there is now a hadoop-2.6 profile, because it does not appear in the building-spark page: http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version ? Thanks, Christophe. Kelkoo SAS Société par Actions Simplifiée Au capital de € 4.168.964,30 Siège social : 158 Ter Rue du Temple 75003 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JavaRDDListTuple2 flatMap Lexicographical Permutations - Java Heap Error
You fundamentally want (half of) the Cartesian product so I don't think it gets a lot faster to form this. You could implement this on cogroup directly and maybe avoid forming the tuples you will filter out. I'd think more about whether you really need to do this thing, or whether there is anything else about the real problem to exploit On Apr 30, 2015 6:36 PM, Dan DeCapria, CivicScience dan.decap...@civicscience.com wrote: Thought about it some more, and simplified the problem space for discussions: Given: JavaPairRDDString, Integer c1; // c1.count() == 8000. Goal: JavaPairRDDTuple2String,Integer,Tuple2String,Integer c2; // all lexicographical pairs Where: all lexicographic permutations on c1 :: (c1_i._1().compareTo(c1_j._1()) 0) - new Tuple2Tuple2String,Integer,Tuple2String,Integer(c1_i, c1_j); // forall c1_i c1_j \in c1 Not sure how to efficiently generate c2. c1.cartesian(c1).filter(... (c1_i._1().compareTo(c1_j._1()) 0) ...) was just terrible performance-wise. Thanks, -Dan On Thu, Apr 30, 2015 at 11:58 AM, Dan DeCapria, CivicScience dan.decap...@civicscience.com wrote: I am trying to generate all (N-1)(N)/2 lexicographical 2-tuples from a glom() JavaPairRDDListTuple2. The construction of these initial Tuple2's JavaPairRDDAQ,Integer space is well formed from case classes I provide it (AQ, AQV, AQQ, CT) and is performant; minimized code: SparkConf conf = new SparkConf() .setAppName(JavaTestMain) .set(spark.driver.maxResultSize, 0) .set(spark.akka.frameSize, 512); ... JavaRDDAQV aqv = sc.textFile(data_account_question_value_file).map((String line) - { String[] s = line.trim().split(,); AQV key = new AQV(Integer.parseInt(s[0].trim()), s[1].trim(), s[2].trim()); // (0,1,2) return key; }); JavaPairRDDAQ, Integer c0 = aqv.distinct().mapToPair((AQV c) - { return new Tuple2AQ, Integer(new AQ(c.getAccount(), c.getQuestion()), 1); }); JavaPairRDDAQ, Integer c1 = c0.reduceByKey((Integer i1, Integer i2) - { return (i1 + i2); }); logger.info(c1.count()); This code snippet above works well and returns a value JavaTestMain: 8010 in a few seconds which is perfect. When I try to generate the iterative lexicographic permutation space (32,076,045 elements), it is not performant and results in a thrown java.lang.OutOfMemoryError: Java heap space; minimized code continued: JavaRDDListTuple2AQ, Integer c2 = c1.glom(); JavaRDDCT c3 = c2.flatMap((ListTuple2AQ,Integer cl) - { ListCT output = new ArrayList((cl.size() - 1) * (cl.size()) / 2); Tuple2AQ, Integer key1, key2; for (ListIteratorTuple2AQ, Integer cloit = cl.listIterator(); cloit.hasNext(); ) { // outer loop key1 = cloit.next(); for (ListIteratorTuple2AQ, Integer cliit = cl.listIterator(cloit.nextIndex()); cliit.hasNext(); ) {// inner loop, if applicable key2 = cliit.next(); output.add(new CT(new AQQ(key1._1(), key2._1()), key1._2(), key2._2())); } } return output; }); c3.collect().stream().forEach(System.out::println); 15/04/30 11:29:59 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 4) java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/04/30 11:29:59 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main] java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/04/30 11:29:59 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 4, localhost): java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
Re: Driver memory leak?
Please use user@, not dev@ This message does not appear to be from your driver. It also doesn't say you ran out of memory. It says you didn't tell YARN to let it use the memory you want. Look at the memory overhead param and please search first for related discussions. On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote: Hi, Dear developer, I am using Spark Streaming to read data from kafka, the program already run about 120 hours, but today the program failed because of driver's OOM as follow: Container [pid=49133,containerID=container_1429773909253_0050_02_01] is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical memory used; 3.2 GB of 50 GB virtual memory used. Killing container. I set --driver-memory to 2g, In my mind, driver is responsibility for job scheduler and job monitor(Please correct me If I'm wrong), Why it using so much memory? So I using jmap to monitor other program(already run about 48 hours): sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as follow: the java.util.HashMap$Entry and java.lang.Long object using about 600Mb memory! and I also using jmap to monitor other program(already run about 1 hours ), the result as follow: the java.util.HashMap$Entry and java.lang.Long object doesn't using so many memory, But I found, as time goes by, the java.util.HashMap$Entry and java.lang.Long object will occupied more and more memory, It is driver's memory leak question? or other reason? Thanks Best Regards
Re: Driver memory leak?
Not sure what you mean. It's already in CDH since 5.4 = 1.3.0 (This isn't the place to ask about CDH) I also don't think that's the problem. The process did not run out of memory. On Wed, Apr 29, 2015 at 2:08 PM, Serega Sheypak serega.shey...@gmail.com wrote: The memory leak could be related to this https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved in Spark 1.2.2 and 1.3.0. @Sean Will it be backported to CDH? I did't find that bug in CDH 5.4 release notes. 2015-04-29 14:51 GMT+02:00 Conor Fennell conor.fenn...@altocloud.com: The memory leak could be related to this https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved in Spark 1.2.2 and 1.3.0. It also was a HashMap causing the issue. -Conor On Wed, Apr 29, 2015 at 12:01 PM, Sean Owen so...@cloudera.com wrote: Please use user@, not dev@ This message does not appear to be from your driver. It also doesn't say you ran out of memory. It says you didn't tell YARN to let it use the memory you want. Look at the memory overhead param and please search first for related discussions. On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote: Hi, Dear developer, I am using Spark Streaming to read data from kafka, the program already run about 120 hours, but today the program failed because of driver's OOM as follow: Container [pid=49133,containerID=container_1429773909253_0050_02_01] is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical memory used; 3.2 GB of 50 GB virtual memory used. Killing container. I set --driver-memory to 2g, In my mind, driver is responsibility for job scheduler and job monitor(Please correct me If I'm wrong), Why it using so much memory? So I using jmap to monitor other program(already run about 48 hours): sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as follow: the java.util.HashMap$Entry and java.lang.Long object using about 600Mb memory! and I also using jmap to monitor other program(already run about 1 hours), the result as follow: the java.util.HashMap$Entry and java.lang.Long object doesn't using so many memory, But I found, as time goes by, the java.util.HashMap$Entry and java.lang.Long object will occupied more and more memory, It is driver's memory leak question? or other reason? Thanks Best Regards
Re: Spark 1.3.1 Hadoop 2.4 Prebuilt package broken ?
Works fine for me. Make sure you're not downloading the HTML redirector page and thinking it's the archive. On Mon, Apr 27, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I downloaded 1.3.1 hadoop 2.4 prebuilt package (tar) from multiple mirrors and direct link. Each time i untar i get below error spark-1.3.1-bin-hadoop2.4/lib/spark-assembly-1.3.1-hadoop2.4.0.jar: (Empty error message) tar: Error exit delayed from previous errors Is it broken ? -- Deepak - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark RDD sortByKey triggering a new job
Yes, I think this is a known issue, that sortByKey actually runs a job to assess the distribution of the data. https://issues.apache.org/jira/browse/SPARK-1021 I think further eyes on it would be welcome as it's not desirable. On Fri, Apr 24, 2015 at 9:57 AM, Spico Florin spicoflo...@gmail.com wrote: I have tested sortByKey method with the following code and I have observed that is triggering a new job when is called. I could find this in the neither in API nor in the code. Is this an indented behavior? For example, the RDD zipWithIndex method API specifies that will trigger a new job. But what about sortByKey? val sc = new SparkContext(new SparkConf().setAppName(Spark Count)) val l =sc.parallelize(List((5,'c'),(2,'d'),(1,'a'),(7,'e')), 3) l.sortByKey() Thanks for your answers. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Convert DStream[Long] to Long
The sum? you just need to use an accumulator to sum the counts or something. On Fri, Apr 24, 2015 at 2:14 PM, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: Sorry for my explanation, my English is bad. I just need obtain the Long containing of the DStream created by messages.count(). Thanks for all. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Convert DStream[Long] to Long
No, it prints each Long in that stream, forever. Have a look at the DStream API. On Fri, Apr 24, 2015 at 2:24 PM, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: But if a use messages.count().print this show a single number :/ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop?
The order of elements in an RDD is in general not guaranteed unless you sort. You shouldn't expect to encounter the partitions of an RDD in any particular order. In practice, you probably find the partitions come up in the order Hadoop presents them in this case. And within a partition, in this case, I don't see why you'd encounter items in any order except that which they exist on HDFS. However I'm not sure if that's the issue. Are you expecting the unique ID to be sequential? it's not. It is also not intended to be sequential within a partition: Items in the kth partition will get ids k, n+k, 2*n+k, ..., where n is the number of partitions That is this result may be the correct result of encountering the underlying RDD in order. I don't know since I don't know the data. It might give what you expect in the case of 1 partition, but this is not a way to get sequential IDs to begin with. That's zipWithIndex. On Fri, Apr 24, 2015 at 10:28 AM, Michal Michalski michal.michal...@boxever.com wrote: I did a quick test as I was curious about it too. I created a file with numbers from 0 to 999, in order, line by line. Then I did: scala val numbers = sc.textFile(./numbers.txt) scala val zipped = numbers.zipWithUniqueId scala zipped.foreach(i = println(i)) Expected result if the order was preserved would be something like: (0, 0), (1, 1) etc. Unfortunately, the output looks like this: (126,1) (223,2) (320,3) (1,0) (127,11) (2,10) (...) The workaround I found that works for me for my specific use case (relatively small input files) is setting explicitly the number of partitions to 1 when reading a single *text* file: scala val numbers_sp = sc.textFile(./numbers.txt, 1) Than the output is exactly as I would expect. I didn't dive into the code too much, but I took a very quick look at it and figured out - correct me if I missed something, it's Friday afternoon! ;-) - that this workaround will work fine for all the input formats inheriting from org.apache.hadoop.mapred.FileInputFormat including TextInputFormat, of course - see the implementation of getSplits() method there ( http://grepcode.com/file/repo1.maven.org/maven2/org.jvnet.hudson.hadoop/hadoop-core/0.19.1-hudson-2/org/apache/hadoop/mapred/FileInputFormat.java#FileInputFormat.getSplits%28org.apache.hadoop.mapred.JobConf%2Cint%29 ). The numSplits variable passed there is exactly the same value as you provide as a second argument to textFile, which is minPartitions. However, while *min* suggests that we can only define a minimal number of partitions, while we have no control over the max, from what I can see in the code, that value specifies the *exact* number of partitions per the FileInputFormat.getSplits implementation. Of course it can differ for other input formats, but in this case it should work just fine. Kind regards, Michał Michalski, michal.michal...@boxever.com On 24 April 2015 at 14:05, Spico Florin spicoflo...@gmail.com wrote: Hello! I know that HadoopRDD partitions are built based on the number of splits in HDFS. I'm wondering if these partitions preserve the initial order of data in file. As an example, if I have an HDFS (myTextFile) file that has these splits: split 0- line 1, ..., line k split 1-line k+1,..., line k+n splt 2-line k+n, line k+n+m and the code val lines=sc.textFile(hdfs://mytextFile) lines.zipWithIndex() will the order of lines preserved? (line 1, zipIndex 1) , .. (line k, zipIndex k), and so one. I found this question on stackoverflow (http://stackoverflow.com/questions/26046410/how-can-i-obtain-an-element-position-in-sparks-rdd) whose answer intrigued me: Essentially, RDD's zipWithIndex() method seems to do this, but it won't preserve the original ordering of the data the RDD was created from Can you please confirm that is this the correct answer? Thanks. Florin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: contributing code - how to test
The standard incantation -- which is a little different from standard Maven practice -- is: mvn -DskipTests [your options] clean package mvn [your options] test Some tests require the assembly, so you have to do it this way. I don't know what the test failures were, you didn't post them, but I'm guessing this is the cause since it failed very early on the launcher module and not on some module that you changed. Sean On Fri, Apr 24, 2015 at 7:35 PM, Deborah Siegel deborah.sie...@gmail.com wrote: Hi, I selected a starter task in JIRA, and made changes to my github fork of the current code. I assumed I would be able to build and test. % mvn clean compile was fine but %mvn package failed [ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.18:test (default-test) on project spark-launcher_2.10: There are test failures. I then reverted my changes, but same story. Any advice is appreciated! Deb - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Convert DStream[Long] to Long
foreachRDD is an action and doesn't return anything. It seems like you want one final count, but that's not possible with a stream, since there is conceptually no end to a stream of data. You can get a stream of counts, which is what you have already. You can sum those counts in another data structure to get a current total. See the streaming word count examples. On Fri, Apr 24, 2015 at 1:50 PM, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: Hi, I need compare the count of messages recived if is 0 or not, but messages.count() return a DStream[Long]. I tried this solution: val cuenta = messages.count().foreachRDD{ rdd = rdd.first() } But this return a type Unit, not Long. Any suggestion? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Tasks run only on one machine
Where are the file splits? meaning is it possible they were also (only) available on one node and that was also your driver? On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel p...@occamsmachete.com wrote: Sure var columns = mc.textFile(source).map { line = line.split(delimiter) } Here “source” is a comma delimited list of files or directories. Both the textFile and .map tasks happen only on the machine they were launched from. Later other distributed operations happen but I suspect if I can figure out why the fist line is run only on the client machine the rest will clear up too. Here are some subsequent lines. if(filterColumn != -1) { columns = columns.filter { tokens = tokens(filterColumn) == filterBy } } val interactions = columns.map { tokens = tokens(rowIDColumn) - tokens(columnIDPosition) } interactions.cache() On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele gangele...@gmail.com wrote: Will you be able to paste code here? On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com wrote: Using Spark streaming to create a large volume of small nano-batch input files, ~4k per file, thousands of ‘part-x’ files. When reading the nano-batch files and doing a distributed calculation my tasks run only on the machine where it was launched. I’m launching in “yarn-client” mode. The rdd is created using sc.textFile(“list of thousand files”) What would cause the read to occur only on the machine that launched the driver. Do I need to do something to the RDD after reading? Has some partition factor been applied to all derived rdds? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Contributors, read me! Updated Contributing to Spark wiki
Following several discussions about how to improve the contribution process in Spark, I've overhauled the guide to contributing. Anyone who is going to contribute needs to read it, as it has more formal guidance about the process: https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark We may push back harder now on pull requests and JIRAs that don't follow this guidance. It will help everyone spend less time to get changes in, and spend less time on duplicated effort, or changes that won't. A summary of key points is found in CONTRIBUTING.md, a prompt presented before opening pull requests (https://github.com/apache/spark/blob/master/CONTRIBUTING.md): - Is the change important and ready enough to ask the community to spend time reviewing? - Have you searched for existing, related JIRAs and pull requests? - Is this a new feature that can stand alone as a package on http://spark-packages.org ? - Is the change being proposed clearly explained and motivated? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple HA spark clusters managed by 1 ZK cluster?
Not that i've tried it, but, why couldn't you use one ZK server? I don't see a reason. On Wed, Apr 22, 2015 at 7:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote: It isn't mentioned anywhere in the doc, but you will probably need separate ZK for each of your HA cluster. Thanks Best Regards On Wed, Apr 22, 2015 at 12:02 AM, Michal Klos michal.klo...@gmail.com wrote: Hi, I'm trying to set up multiple spark clusters with high availability and I was wondering if I can re-use a single ZK cluster to manage them? It's not very clear in the docs and it seems like the answer may be that I need a separate ZK cluster for each spark cluster? thanks, M - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLlib - Collaborative Filtering - trainImplicit task size
I think maybe you need more partitions in your input, which might make for smaller tasks? On Tue, Apr 21, 2015 at 2:56 AM, Christian S. Perone christian.per...@gmail.com wrote: I keep seeing these warnings when using trainImplicit: WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB). The maximum recommended task size is 100 KB. And then the task size starts to increase. Is this a known issue ? Thanks ! -- Blog | Github | Twitter Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: writing to hdfs on master node much faster
What machines are HDFS data nodes -- just your master? that would explain it. Otherwise, is it actually the write that's slow or is something else you're doing much faster on the master for other reasons maybe? like you're actually shipping data via the master first in some local computation? so the master's executor has the result much faster? On Mon, Apr 20, 2015 at 12:21 PM, jamborta jambo...@gmail.com wrote: Hi all, I have a three node cluster with identical hardware. I am trying a workflow where it reads data from hdfs, repartitions it and runs a few map operations then writes the results back to hdfs. It looks like that all the computation, including the repartitioning and the maps complete within similar time intervals on all the nodes, except when it writes it back to HDFS when the master node does the job way much faster then the slaves (15s for each block as opposed to 1.2 min for the slaves). Any suggestion what the reason might be? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/writing-to-hdfs-on-master-node-much-faster-tp22570.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [STREAMING KAFKA - Direct Approach] JavaPairRDD cannot be cast to HasOffsetRanges
You need to access the underlying RDD with .rdd() and cast that. That works for me. On Mon, Apr 20, 2015 at 4:41 AM, RimBerry truonghoanglinhk55b...@gmail.com wrote: Hi everyone, i am trying to use the direct approach in streaming-kafka-integration http://spark.apache.org/docs/latest/streaming-kafka-integration.html pulling data from kafka as follow JavaPairInputDStreamString, String messages = KafkaUatils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); messages.foreachRDD( new FunctionJavaPairRDDlt;String,String, Void() { @Override public Void call(JavaPairRDDString, String rdd) throws IOException { OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd).offsetRanges(); //. return null; } } ); then i got an error when running it *java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges* at OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd).offsetRanges(); i am using the version 1.3.1 if is it a bug in this version ? Thank you for spending time with me. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/STREAMING-KAFKA-Direct-Approach-JavaPairRDD-cannot-be-cast-to-HasOffsetRanges-tp22568.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: compliation error
Brahma since you can see the continuous integration builds are passing, it's got to be something specific to your environment, right? this is not even an error from Spark, but from Maven plugins. On Mon, Apr 20, 2015 at 4:42 AM, Ted Yu yuzhih...@gmail.com wrote: bq. -Dhadoop.version=V100R001C00 First time I saw above hadoop version. Doesn't look like Apache release. I checked my local maven repo but didn't find impl under ~/.m2/repository/com/ibm/icu FYI On Sun, Apr 19, 2015 at 8:04 PM, Brahma Reddy Battula brahmareddy.batt...@huawei.com wrote: Hey Todd Thanks a lot for your reply...Kindly check following details.. spark version :1.1.0 jdk:jdk1.7.0_60 , command:mvn -Pbigtop-dist -Phive -Pyarn -Phadoop-2.4 -Dhadoop.version=V100R001C00 -DskipTests package Thanks Regards Brahma Reddy Battula From: Ted Yu [yuzhih...@gmail.com] Sent: Monday, April 20, 2015 8:07 AM To: Brahma Reddy Battula Cc: user@spark.apache.org Subject: Re: compliation error What JDK release are you using ? Can you give the complete command you used ? Which Spark branch are you working with ? Cheers On Sun, Apr 19, 2015 at 7:25 PM, Brahma Reddy Battula brahmareddy.batt...@huawei.com wrote: Hi All Getting following error, when I am compiling spark..What did I miss..? Even googled and did not find the exact solution for this... [ERROR] Failed to execute goal org.apache.maven.plugins:maven-shade-plugin:2.2:shade (default) on project spark-assembly_2.10: Error creating shaded jar: Error in ASM processing class com/ibm/icu/impl/data/LocaleElements_zh__PINYIN.class: 48188 - [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.apache.maven.plugins:maven-shade-plugin:2.2:shade (default) on project spark-assembly_2.10: Error creating shaded jar: Error in ASM processing class com/ibm/icu/impl/data/LocaleElements_zh__PINYIN.class at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:217) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:84) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:59) at org.apache.maven.lifecycle.internal.LifecycleStarter.singleThreadedBuild(LifecycleStarter.java:183) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:161) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:320) Thanks Regards Brahma Reddy Battula - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does reduceByKey only work properly for numeric keys?
Do these datetime objects implement a the notion of equality you'd expect? (This may be a dumb question; I'm thinking of the equivalent of equals() / hashCode() from the Java world.) On Sat, Apr 18, 2015 at 4:17 PM, SecondDatke lovejay-lovemu...@outlook.com wrote: I'm trying to solve a Word-Count like problem, the difference lies in that, I need the count of a specific word among a specific timespan in a social message stream. My data is in the format of (time, message), and I transformed (flatMap etc.) it into a series of (time, word_id), the time is represented with Python datetime.datetime class. And I continued to transform it to ((time, word_id), 1) then use reduceByKey for result. But the dataset returned is a little weird, just like the following: format: ((timespan with datetime.datetime, wordid), freq) ((datetime.datetime(2009, 10, 6, 2, 0), 0), 8) ((datetime.datetime(2009, 10, 6, 3, 0), 0), 3) ((datetime.datetime(2009, 10, 6, 3, 0), 0), 14) As you can see, there are DUPLICATED keys, but as a result of reducedByKey, all keys SHOULD BE UNIQUE. I tried to convert the key to a string (like '2006-12-02 21:00:00-000') and reducedByKey again, the problem stays. It seems the only way left for me is convert the date to a timestamp, but this time it works. Is this expected behavior of reduceByKey(and all other transformations that work with keys)? Currently I'm still working on it. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: External JARs not loading Spark Shell Scala 2.11
Doesn't this reduce to Scala isn't compatible with itself across maintenance releases? Meaning, if this were fixed then Scala 2.11.{x 6} would have similar failures. It's not not-ready; it's just not the Scala 2.11.6 REPL. Still, sure I'd favor breaking the unofficial support to at least make the latest Scala 2.11 the unbroken one. On Fri, Apr 17, 2015 at 7:58 AM, Michael Allman mich...@videoamp.com wrote: FWIW, this is an essential feature to our use of Spark, and I'm surprised it's not advertised clearly as a limitation in the documentation. All I've found about running Spark 1.3 on 2.11 is here: http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211 Also, I'm experiencing some serious stability problems simply trying to run the Spark 1.3 Scala 2.11 REPL. Most of the time it fails to load and spews a torrent of compiler assertion failures, etc. See attached. Unfortunately, it appears the Spark 1.3 Scala 2.11 REPL is simply not ready for production use. I was going to file a bug, but it seems clear that the current implementation is going to need to be forward-ported to Scala 2.11.6 anyway. We already have an issue for that: https://issues.apache.org/jira/browse/SPARK-6155 Michael On Apr 9, 2015, at 10:29 PM, Prashant Sharma scrapco...@gmail.com wrote: You will have to go to this commit ID 191d7cf2a655d032f160b9fa181730364681d0e7 in Apache spark. [1] Once you are at that commit, you need to review the changes done to the repl code and look for the relevant occurrences of the same code in scala 2.11 repl source and somehow make it all work. Thanks, 1. http://githowto.com/getting_old_versions Prashant Sharma On Thu, Apr 9, 2015 at 4:40 PM, Alex Nakos ana...@gmail.com wrote: Ok, what do i need to do in order to migrate the patch? Thanks Alex On Thu, Apr 9, 2015 at 11:54 AM, Prashant Sharma scrapco...@gmail.com wrote: This is the jira I referred to https://issues.apache.org/jira/browse/SPARK-3256. Another reason for not working on it is evaluating priority between upgrading to scala 2.11.5(it is non trivial I suppose because repl has changed a bit) or migrating that patch is much simpler. Prashant Sharma On Thu, Apr 9, 2015 at 4:16 PM, Alex Nakos ana...@gmail.com wrote: Hi- Was this the JIRA issue? https://issues.apache.org/jira/browse/SPARK-2988 Any help in getting this working would be much appreciated! Thanks Alex On Thu, Apr 9, 2015 at 11:32 AM, Prashant Sharma scrapco...@gmail.com wrote: You are right this needs to be done. I can work on it soon, I was not sure if there is any one even using scala 2.11 spark repl. Actually there is a patch in scala 2.10 shell to support adding jars (Lost the JIRA ID), which has to be ported for scala 2.11 too. If however, you(or anyone else) are planning to work, I can help you ? Prashant Sharma On Thu, Apr 9, 2015 at 3:08 PM, anakos ana...@gmail.com wrote: Hi- I am having difficulty getting the 1.3.0 Spark shell to find an external jar. I have build Spark locally for Scala 2.11 and I am starting the REPL as follows: bin/spark-shell --master yarn --jars data-api-es-data-export-4.0.0.jar I see the following line in the console output: 15/04/09 09:52:15 INFO spark.SparkContext: Added JAR file:/opt/spark/spark-1.3.0_2.11-hadoop2.3/data-api-es-data-export-4.0.0.jar at http://192.168.115.31:54421/jars/data-api-es-data-export-4.0.0.jar with timestamp 1428569535904 but when i try to import anything from this jar, it's simply not available. When I try to add the jar manually using the command :cp /path/to/jar the classes in the jar are still unavailable. I understand that 2.11 is not officially supported, but has anyone been able to get an external jar loaded in the 1.3.0 release? Is this a known issue? I have tried searching around for answers but the only thing I've found that may be related is this: https://issues.apache.org/jira/browse/SPARK-3257 Any/all help is much appreciated. Thanks Alex -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/External-JARs-not-loading-Spark-Shell-Scala-2-11-tp22434.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Executor memory in web UI
This is the fraction available for caching, which is 60% * 90% * total by default. On Fri, Apr 17, 2015 at 11:30 AM, podioss grega...@hotmail.com wrote: Hi, i am a bit confused with the executor-memory option. I am running applications with Standalone cluster manager with 8 workers with 4gb memory and 2 cores each and when i submit my application with spark-submit i use --executor-memory 1g. In the web ui in the completed applications table i see that my application was correctly submitted with 1g memory per node as expected but when i check the executors tab of the application i see that every executor launched with 530mb which is about half the memory of the configuration. I would really appreciate an explanation if anyone knew. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-memory-in-web-UI-tp22538.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: External JARs not loading Spark Shell Scala 2.11
You are running on 2.11.6, right? of course, it seems like that should all work, but it doesn't work for you. My point is that the shell you are saying doesn't work is Scala's 2.11.2 shell -- with some light modification. It's possible that the delta is the problem. I can't entirely make out whether the errors are Spark-specific; they involve Spark classes in some cases but they're assertion errors from Scala libraries. I don't know if this shell is supposed to work even across maintenance releases as-is, though that would be very nice. It's not an API for Scala. A good test of whether this idea has any merit would be to run with Scala 2.11.2. I'll copy this to JIRA for continuation. On Fri, Apr 17, 2015 at 10:31 PM, Michael Allman mich...@videoamp.com wrote: H... I don't follow. The 2.11.x series is supposed to be binary compatible against user code. Anyway, I was building Spark against 2.11.2 and still saw the problems with the REPL. I've created a bug report: https://issues.apache.org/jira/browse/SPARK-6989 I hope this helps. Cheers, Michael On Apr 17, 2015, at 1:41 AM, Sean Owen so...@cloudera.com wrote: Doesn't this reduce to Scala isn't compatible with itself across maintenance releases? Meaning, if this were fixed then Scala 2.11.{x 6} would have similar failures. It's not not-ready; it's just not the Scala 2.11.6 REPL. Still, sure I'd favor breaking the unofficial support to at least make the latest Scala 2.11 the unbroken one. On Fri, Apr 17, 2015 at 7:58 AM, Michael Allman mich...@videoamp.com wrote: FWIW, this is an essential feature to our use of Spark, and I'm surprised it's not advertised clearly as a limitation in the documentation. All I've found about running Spark 1.3 on 2.11 is here: http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211 Also, I'm experiencing some serious stability problems simply trying to run the Spark 1.3 Scala 2.11 REPL. Most of the time it fails to load and spews a torrent of compiler assertion failures, etc. See attached. Unfortunately, it appears the Spark 1.3 Scala 2.11 REPL is simply not ready for production use. I was going to file a bug, but it seems clear that the current implementation is going to need to be forward-ported to Scala 2.11.6 anyway. We already have an issue for that: https://issues.apache.org/jira/browse/SPARK-6155 Michael On Apr 9, 2015, at 10:29 PM, Prashant Sharma scrapco...@gmail.com wrote: You will have to go to this commit ID 191d7cf2a655d032f160b9fa181730364681d0e7 in Apache spark. [1] Once you are at that commit, you need to review the changes done to the repl code and look for the relevant occurrences of the same code in scala 2.11 repl source and somehow make it all work. Thanks, 1. http://githowto.com/getting_old_versions Prashant Sharma On Thu, Apr 9, 2015 at 4:40 PM, Alex Nakos ana...@gmail.com wrote: Ok, what do i need to do in order to migrate the patch? Thanks Alex On Thu, Apr 9, 2015 at 11:54 AM, Prashant Sharma scrapco...@gmail.com wrote: This is the jira I referred to https://issues.apache.org/jira/browse/SPARK-3256. Another reason for not working on it is evaluating priority between upgrading to scala 2.11.5(it is non trivial I suppose because repl has changed a bit) or migrating that patch is much simpler. Prashant Sharma On Thu, Apr 9, 2015 at 4:16 PM, Alex Nakos ana...@gmail.com wrote: Hi- Was this the JIRA issue? https://issues.apache.org/jira/browse/SPARK-2988 Any help in getting this working would be much appreciated! Thanks Alex On Thu, Apr 9, 2015 at 11:32 AM, Prashant Sharma scrapco...@gmail.com wrote: You are right this needs to be done. I can work on it soon, I was not sure if there is any one even using scala 2.11 spark repl. Actually there is a patch in scala 2.10 shell to support adding jars (Lost the JIRA ID), which has to be ported for scala 2.11 too. If however, you(or anyone else) are planning to work, I can help you ? Prashant Sharma On Thu, Apr 9, 2015 at 3:08 PM, anakos ana...@gmail.com wrote: Hi- I am having difficulty getting the 1.3.0 Spark shell to find an external jar. I have build Spark locally for Scala 2.11 and I am starting the REPL as follows: bin/spark-shell --master yarn --jars data-api-es-data-export-4.0.0.jar I see the following line in the console output: 15/04/09 09:52:15 INFO spark.SparkContext: Added JAR file:/opt/spark/spark-1.3.0_2.11-hadoop2.3/data-api-es-data-export-4.0.0.jar at http://192.168.115.31:54421/jars/data-api-es-data-export-4.0.0.jar with timestamp 1428569535904 but when i try to import anything from this jar, it's simply not available. When I try to add the jar manually using the command :cp /path/to/jar the classes in the jar are still unavailable. I understand that 2.11 is not officially supported, but has anyone been able to get an external jar loaded in the 1.3.0 release
Re: How to join RDD keyValuePairs efficiently
This would be much, much faster if your set of IDs was simply a Set, and you passed that to a filter() call that just filtered in the docs that matched an ID in the set. On Thu, Apr 16, 2015 at 4:51 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Does anybody have a solution for this? From: Wang, Ningjun (LNG-NPV) Sent: Tuesday, April 14, 2015 10:41 AM To: user@spark.apache.org Subject: How to join RDD keyValuePairs efficiently I have an RDD that contains millions of Document objects. Each document has an unique Id that is a string. I need to find the documents by ids quickly. Currently I used RDD join as follow First I save the RDD as object file allDocs : RDD[Document] = getDocs() // this RDD contains 7 million Document objects allDocs.saveAsObjectFile(“/temp/allDocs.obj”) Then I wrote a function to find documents by Ids def findDocumentsByIds(docids: RDD[String]) = { // docids contains less than 100 item val allDocs : RDD[Document] =sc.objectFile[Document]( (“/temp/allDocs.obj”) val idAndDocs = allDocs.keyBy(d = dv.id) docids.map(id = (id,id)).join(idAndDocs).map(t = t._2._2) } I found that this is very slow. I suspect it scan the entire 7 million Document objects in “/temp/allDocs.obj” sequentially to find the desired document. Is there any efficient way to do this? One option I am thinking is that instead of storing the RDD[Document] as object file, I store each document in a separate file with filename equal to the docid. This way I can find a document quickly by docid. However this means I need to save the RDD to 7 million small file which will take a very long time to save and may cause IO problems with so many small files. Is there any other way? Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile
Just copy the files? it shouldn't matter that much where they are as you can find them easily. Or consider somehow sending the batches of data straight into Redshift? no idea how that is done but I imagine it's doable. On Thu, Apr 16, 2015 at 6:38 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks Sean. I want to load each batch into Redshift. What's the best/most efficient way to do that? Vadim On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote: You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: General configurations on CDH5 to achieve maximum Spark Performance
I don't think there's anything specific to CDH that you need to know, other than it ought to set things up sanely for you. Sandy did a couple posts about tuning: http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/ http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ I don't think there's such a thing as one optimal configuration. It depends very heavily on your workload. First you need to have a look at your app, really. All the tuning in the world isn't going to make an unnecessary shuffle as fast as eliminating it. On Thu, Apr 16, 2015 at 6:02 PM, Manish Gupta 8 mgupt...@sapient.com wrote: Hi, Is there a document/link that describes the general configuration settings to achieve maximum Spark Performance while running on CDH5? In our environment, we did lot of changes (and still doing it) to get decent performance otherwise our 6 node dev cluster with default configurations, lags behind a single laptop running Spark. Having a standard checklist (taking a base node size of 4-CPU, 16GB RAM) would be really great. Any pointers in this regards will be really helpful. We are running Spark 1.2.0 on CDH 5.3.0. Thanks, Manish Gupta Specialist | Sapient Global Markets Green Boulevard (Tower C) 3rd 4th Floor Plot No. B-9A, Sector 62 Noida 201 301 Uttar Pradesh, India Tel: +91 (120) 479 5000 Fax: +91 (120) 479 5001 Email: mgupt...@sapient.com sapientglobalmarkets.com The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any (your) computer. ***Please consider the environment before printing this email.*** - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile
You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.dynamicAllocation.minExecutors
Yes, look what it was before -- would also reject a minimum of 0. That's the case you are hitting. 0 is a fine minimum. On Thu, Apr 16, 2015 at 8:09 PM, Michael Stone mst...@mathom.us wrote: On Thu, Apr 16, 2015 at 07:47:51PM +0100, Sean Owen wrote: IIRC that was fixed already in 1.3 https://github.com/apache/spark/commit/b2047b55c5fc85de6b63276d8ab9610d2496e08b From that commit: + private val minNumExecutors = conf.getInt(spark.dynamicAllocation.minExecutors, 0) ... + if (maxNumExecutors == 0) { + throw new SparkException(spark.dynamicAllocation.maxExecutors cannot be 0!) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.dynamicAllocation.minExecutors
Looks like that message would be triggered if spark.dynamicAllocation.initialExecutors was not set, or 0, if I read this right. Yeah, that might have to be positive. This requires you set initial executors to 1 if you want 0 min executors. Hm, maybe that shouldn't be an error condition in the args parser. I could go either way on that, myself. On Thu, Apr 16, 2015 at 8:17 PM, Michael Stone mst...@mathom.us wrote: On Thu, Apr 16, 2015 at 12:16:13PM -0700, Marcelo Vanzin wrote: I think Michael is referring to this: Exception in thread main java.lang.IllegalArgumentException: You must specify at least 1 executor! Usage: org.apache.spark.deploy.yarn.Client [options] Yes, sorry, there were too many mins and maxs and I copied the wrong line. Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Random pairs / RDD order
(Indeed, though the OP said it was a requirement that the pairs are drawn from the same partition.) On Thu, Apr 16, 2015 at 11:14 PM, Guillaume Pitel guillaume.pi...@exensa.com wrote: Hi Aurelien, Sean's solution is nice, but maybe not completely order-free, since pairs will come from the same partition. The easiest / fastest way to do it in my opinion is to use a random key instead of a zipWithIndex. Of course you'll not be able to ensure uniqueness of each elements of the pairs, but maybe you don't care since you're sampling with replacement already? val a = rdd.sample(...).map{ x = (rand() % k, x)} val b = rdd.sample(...).map{ x = (rand() % k, x)} k must be ~ the number of elements you're sampling. You'll have a skewed distribution due to collisions, but I don't think it should hurt too much. Guillaume Hi everyone, However I am not happy with this solution because each element is most likely to be paired with elements that are closeby in the partition. This is because sample returns an ordered Iterator. -- Guillaume PITEL, Président +33(0)626 222 431 eXenSa S.A.S. 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: StackOverflowError from KafkaReceiver when rate limiting used
Yeah, this really shouldn't be recursive. It can't be optimized since it's not a final/private method. I think you're welcome to try a PR to un-recursivize it. On Thu, Apr 16, 2015 at 7:31 PM, Jeff Nadler jnad...@srcginc.com wrote: I've got a Kafka topic on which lots of data has built up, and a streaming app with a rate limit. During maintenance for example records will build up on Kafka and we'll burn them off on restart. The rate limit keeps the job stable while burning off the backlog. Sometimes on the first or second interval that gets data after a restart, the receiver dies with this error. At the moment, it's happening every time we try to start the application. Any ideas? 15/04/16 10:41:50 ERROR KafkaReceiver: Error handling message; exiting java.lang.StackOverflowError at org.apache.spark.streaming.receiver.RateLimiter.waitToPush(RateLimiter.scala:66) at org.apache.spark.streaming.receiver.RateLimiter.waitToPush(RateLimiter.scala:66) at org.apache.spark.streaming.receiver.RateLimiter.waitToPush(RateLimiter.scala:66) ...thousands of lines like that Side note, any idea why the scala compiler isn't optimizing waitToPush into a loop? Looks like tail recursion, no? Thanks- Jeff Nadler - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Random pairs / RDD order
Use mapPartitions, and then take two random samples of the elements in the partition, and return an iterator over all pairs of them? Should be pretty simple assuming your sample size n is smallish since you're returning ~n^2 pairs. On Thu, Apr 16, 2015 at 7:00 PM, abellet aurelien.bel...@telecom-paristech.fr wrote: Hi everyone, I have a large RDD and I am trying to create a RDD of a random sample of pairs of elements from this RDD. The elements composing a pair should come from the same partition for efficiency. The idea I've come up with is to take two random samples and then use zipPartitions to pair each i-th element of the first sample with the i-th element of the second sample. Here is a sample code illustrating the idea: --- val rdd = sc.parallelize(1 to 6, 16) val sample1 = rdd.sample(true,0.01,42) val sample2 = rdd.sample(true,0.01,43) def myfunc(s1: Iterator[Int], s2: Iterator[Int]): Iterator[String] = { var res = List[String]() while (s1.hasNext s2.hasNext) { val x = s1.next + + s2.next res ::= x } res.iterator } val pairs = sample1.zipPartitions(sample2)(myfunc) - However I am not happy with this solution because each element is most likely to be paired with elements that are closeby in the partition. This is because sample returns an ordered Iterator. Any idea how to fix this? I did not find a way to efficiently shuffle the random sample so far. Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Random-pairs-RDD-order-tp22529.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.dynamicAllocation.minExecutors
IIRC that was fixed already in 1.3 https://github.com/apache/spark/commit/b2047b55c5fc85de6b63276d8ab9610d2496e08b On Thu, Apr 16, 2015 at 7:41 PM, Michael Stone mst...@mathom.us wrote: The default for spark.dynamicAllocation.minExecutors is 0, but that value causes a runtime error and a message that the minimum is 1. Perhaps the default should be changed to 1? Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: adding new elements to batch RDD from DStream RDD
What do you mean by batch RDD? they're just RDDs, though store their data in different ways and come from different sources. You can union an RDD from an HDFS file with one from a DStream. It sounds like you want streaming data to live longer than its batch interval, but that's not something you can expect the streaming framework to provide. It's perfectly possible to save the RDD's data to persistent store and use it later. You can't update RDDs; they're immutable. You can re-read data from persistent store by making a new RDD at any time. On Wed, Apr 15, 2015 at 7:37 PM, Evo Eftimov evo.efti...@isecc.com wrote: The only way to join / union /cogroup a DStream RDD with Batch RDD is via the transform method, which returns another DStream RDD and hence it gets discarded at the end of the micro-batch. Is there any way to e.g. union Dstream RDD with Batch RDD which produces a new Batch RDD containing the elements of both the DStream RDD and the Batch RDD. And once such Batch RDD is created in the above way, can it be used by other DStream RDDs to e.g. join with as this time the result can be another DStream RDD Effectively the functionality described above will result in periodical updates (additions) of elements to a Batch RDD - the additional elements will keep coming from DStream RDDs which keep streaming in with every micro-batch. Also newly arriving DStream RDDs will be able to join with the thus previously updated BAtch RDD and produce a result DStream RDD Something almost like that can be achieved with updateStateByKey, but is there a way to do it as described here -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/adding-new-elements-to-batch-RDD-from-DStream-RDD-tp22504.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: adding new elements to batch RDD from DStream RDD
Yep, you are looking at operations on DStream, which is not what I'm talking about. You should look at DStream.foreachRDD (or Java equivalent), which hands you an RDD. Makes more sense? The rest may make more sense when you try it. There is actually a lot less complexity than you think. On Wed, Apr 15, 2015 at 8:37 PM, Evo Eftimov evo.efti...@isecc.com wrote: The OO API in question was mentioned several times - as the transform method of DStreamRDD which is the ONLY way to join/cogroup/union DSTreamRDD with batch RDD aka JavaRDD Here is paste from the spark javadoc K2,V2 JavaPairDStreamK2,V2 transformToPair(FunctionR,JavaPairRDDK2,V2 transformFunc) Return a new DStream in which each RDD is generated by applying a function on each RDD of 'this' DStream. As you can see it ALWAYS returns a DStream NOT a JavaRDD aka batch RDD Re the rest of the discussion (re-loading batch RDD from file within spark steraming context) - lets leave that since we are not getting anywhere -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 8:30 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: adding new elements to batch RDD from DStream RDD What API differences are you talking about? a DStream gives a sequence of RDDs. I'm not referring to DStream or its API. Spark in general can execute many pipelines at once, ones that even refer to the same RDD. What I mean you seem to be looking for a way to change one shared RDD, but in fact, you simply create an RDD on top of the current state of the data whenever and wherever you wish. Unless you're caching the RDD's blocks, you don't have much need to share a reference to one RDD anyway, which is what I thought you were getting at. On Wed, Apr 15, 2015 at 8:25 PM, Evo Eftimov evo.efti...@isecc.com wrote: I keep seeing only common statements Re DStream RDDs and Batch RDDs - There is certainly something to keep me from using them together and it is the OO API differences I have described previously, several times ... Re the batch RDD reloading from file and that there is no need for threads - the driver of spark streaming app instantiates and submits a DAG pipeline to the spark streaming cluster and keeps it alive while it is running - this is not exactly a liner execution where the main thread of the driver can invoke the spark context method for loading batch RDDs from file for e.g. a second time moreover after specific period of time -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 8:14 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: adding new elements to batch RDD from DStream RDD Yes, I mean there's nothing to keep you from using them together other than their very different lifetime. That's probably the key here: if you need the streaming data to live a long time it has to live in persistent storage first. I do exactly this and what you describe for the same purpose. I don't believe there's any need for threads; an RDD is just bookkeeping about partitions, and that has to be re-assessed when the underlying data grows. But making a new RDD on the fly is easy. It's a reference to the data only. (Well, that changes if you cache the results, in which case you very much care about unpersisting the RDD before getting a different reference to all of the same data and more.) On Wed, Apr 15, 2015 at 8:06 PM, Evo Eftimov evo.efti...@isecc.com wrote: Hi Sean well there is certainly a difference between batch RDD and streaming RDD and in the previous reply you have already outlined some. Other differences are in the Object Oriented Model / API of Spark, which also matters besides the RDD / Spark Cluster Platform architecture. Secondly, in the previous em I have clearly described what I mean by update and that it is a result of RDD transformation and hence a new RDD derived from the previously joined/union/cogrouped one - ie not mutating an existing RDD Lets also leave aside the architectural goal why I want to keep updating a batch RDD with new data coming from DStream RDDs - fyi it is NOT to make streaming RDDs long living Let me now go back to the overall objective - the app context is Spark Streaming job. I want to update / add the content of incoming streaming RDDs (e.g. JavaDStreamRDDs) to an already loaded (e.g. from HDFS file) batch RDD e.g. JavaRDD - the only way to union / join / cogroup from DSTreamRDD to batch RDD is via the transform method which always returns DStream RDD NOT batch RDD - check the API On a separate note - your suggestion to keep reloading a Batch RDD from a file - it may have some applications in other scenarios so lets drill down into it - in the context of Spark Streaming app where the driver launches a DAG pipeline and then just essentially hangs, I guess the only way to keep reloading a batch RDD from file is from
Re: adding new elements to batch RDD from DStream RDD
Yes, I mean there's nothing to keep you from using them together other than their very different lifetime. That's probably the key here: if you need the streaming data to live a long time it has to live in persistent storage first. I do exactly this and what you describe for the same purpose. I don't believe there's any need for threads; an RDD is just bookkeeping about partitions, and that has to be re-assessed when the underlying data grows. But making a new RDD on the fly is easy. It's a reference to the data only. (Well, that changes if you cache the results, in which case you very much care about unpersisting the RDD before getting a different reference to all of the same data and more.) On Wed, Apr 15, 2015 at 8:06 PM, Evo Eftimov evo.efti...@isecc.com wrote: Hi Sean well there is certainly a difference between batch RDD and streaming RDD and in the previous reply you have already outlined some. Other differences are in the Object Oriented Model / API of Spark, which also matters besides the RDD / Spark Cluster Platform architecture. Secondly, in the previous em I have clearly described what I mean by update and that it is a result of RDD transformation and hence a new RDD derived from the previously joined/union/cogrouped one - ie not mutating an existing RDD Lets also leave aside the architectural goal why I want to keep updating a batch RDD with new data coming from DStream RDDs - fyi it is NOT to make streaming RDDs long living Let me now go back to the overall objective - the app context is Spark Streaming job. I want to update / add the content of incoming streaming RDDs (e.g. JavaDStreamRDDs) to an already loaded (e.g. from HDFS file) batch RDD e.g. JavaRDD - the only way to union / join / cogroup from DSTreamRDD to batch RDD is via the transform method which always returns DStream RDD NOT batch RDD - check the API On a separate note - your suggestion to keep reloading a Batch RDD from a file - it may have some applications in other scenarios so lets drill down into it - in the context of Spark Streaming app where the driver launches a DAG pipeline and then just essentially hangs, I guess the only way to keep reloading a batch RDD from file is from a separate thread still using the same spark context. The thread will reload the batch RDD with the same reference ie reassign the reference to the newly instantiated/loaded batch RDD - is that what you mean by reloading batch RDD from file -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 7:43 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: adding new elements to batch RDD from DStream RDD What do you mean by batch RDD? they're just RDDs, though store their data in different ways and come from different sources. You can union an RDD from an HDFS file with one from a DStream. It sounds like you want streaming data to live longer than its batch interval, but that's not something you can expect the streaming framework to provide. It's perfectly possible to save the RDD's data to persistent store and use it later. You can't update RDDs; they're immutable. You can re-read data from persistent store by making a new RDD at any time. On Wed, Apr 15, 2015 at 7:37 PM, Evo Eftimov evo.efti...@isecc.com wrote: The only way to join / union /cogroup a DStream RDD with Batch RDD is via the transform method, which returns another DStream RDD and hence it gets discarded at the end of the micro-batch. Is there any way to e.g. union Dstream RDD with Batch RDD which produces a new Batch RDD containing the elements of both the DStream RDD and the Batch RDD. And once such Batch RDD is created in the above way, can it be used by other DStream RDDs to e.g. join with as this time the result can be another DStream RDD Effectively the functionality described above will result in periodical updates (additions) of elements to a Batch RDD - the additional elements will keep coming from DStream RDDs which keep streaming in with every micro-batch. Also newly arriving DStream RDDs will be able to join with the thus previously updated BAtch RDD and produce a result DStream RDD Something almost like that can be achieved with updateStateByKey, but is there a way to do it as described here -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/adding-new-element s-to-batch-RDD-from-DStream-RDD-tp22504.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: adding new elements to batch RDD from DStream RDD
What API differences are you talking about? a DStream gives a sequence of RDDs. I'm not referring to DStream or its API. Spark in general can execute many pipelines at once, ones that even refer to the same RDD. What I mean you seem to be looking for a way to change one shared RDD, but in fact, you simply create an RDD on top of the current state of the data whenever and wherever you wish. Unless you're caching the RDD's blocks, you don't have much need to share a reference to one RDD anyway, which is what I thought you were getting at. On Wed, Apr 15, 2015 at 8:25 PM, Evo Eftimov evo.efti...@isecc.com wrote: I keep seeing only common statements Re DStream RDDs and Batch RDDs - There is certainly something to keep me from using them together and it is the OO API differences I have described previously, several times ... Re the batch RDD reloading from file and that there is no need for threads - the driver of spark streaming app instantiates and submits a DAG pipeline to the spark streaming cluster and keeps it alive while it is running - this is not exactly a liner execution where the main thread of the driver can invoke the spark context method for loading batch RDDs from file for e.g. a second time moreover after specific period of time -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 8:14 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: adding new elements to batch RDD from DStream RDD Yes, I mean there's nothing to keep you from using them together other than their very different lifetime. That's probably the key here: if you need the streaming data to live a long time it has to live in persistent storage first. I do exactly this and what you describe for the same purpose. I don't believe there's any need for threads; an RDD is just bookkeeping about partitions, and that has to be re-assessed when the underlying data grows. But making a new RDD on the fly is easy. It's a reference to the data only. (Well, that changes if you cache the results, in which case you very much care about unpersisting the RDD before getting a different reference to all of the same data and more.) On Wed, Apr 15, 2015 at 8:06 PM, Evo Eftimov evo.efti...@isecc.com wrote: Hi Sean well there is certainly a difference between batch RDD and streaming RDD and in the previous reply you have already outlined some. Other differences are in the Object Oriented Model / API of Spark, which also matters besides the RDD / Spark Cluster Platform architecture. Secondly, in the previous em I have clearly described what I mean by update and that it is a result of RDD transformation and hence a new RDD derived from the previously joined/union/cogrouped one - ie not mutating an existing RDD Lets also leave aside the architectural goal why I want to keep updating a batch RDD with new data coming from DStream RDDs - fyi it is NOT to make streaming RDDs long living Let me now go back to the overall objective - the app context is Spark Streaming job. I want to update / add the content of incoming streaming RDDs (e.g. JavaDStreamRDDs) to an already loaded (e.g. from HDFS file) batch RDD e.g. JavaRDD - the only way to union / join / cogroup from DSTreamRDD to batch RDD is via the transform method which always returns DStream RDD NOT batch RDD - check the API On a separate note - your suggestion to keep reloading a Batch RDD from a file - it may have some applications in other scenarios so lets drill down into it - in the context of Spark Streaming app where the driver launches a DAG pipeline and then just essentially hangs, I guess the only way to keep reloading a batch RDD from file is from a separate thread still using the same spark context. The thread will reload the batch RDD with the same reference ie reassign the reference to the newly instantiated/loaded batch RDD - is that what you mean by reloading batch RDD from file -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 7:43 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: adding new elements to batch RDD from DStream RDD What do you mean by batch RDD? they're just RDDs, though store their data in different ways and come from different sources. You can union an RDD from an HDFS file with one from a DStream. It sounds like you want streaming data to live longer than its batch interval, but that's not something you can expect the streaming framework to provide. It's perfectly possible to save the RDD's data to persistent store and use it later. You can't update RDDs; they're immutable. You can re-read data from persistent store by making a new RDD at any time. On Wed, Apr 15, 2015 at 7:37 PM, Evo Eftimov evo.efti...@isecc.com wrote: The only way to join / union /cogroup a DStream RDD with Batch RDD is via the transform method, which returns another DStream