Re: Overlapping classes warnings
In general, I don't think that means you should exclude something; it's still needed. The problem is that commons config depends *only* on *beanutils-core 1.8.0* so it ends up managing up that artifact version only, and not the main beanutils one. In this particular instance, which I've seen before, it seems clear that beanutils-core *can* be excluded since it's a subset (see: the warning message!) and beanutils is an uber-artifact. But then we'd have to manage up beanutils to 1.8.0. On Thu, Apr 9, 2015 at 5:24 PM, Ted Yu yuzhih...@gmail.com wrote: commons-beanutils is brought in transitively: [INFO] | +- org.apache.hadoop:hadoop-common:jar:2.4.0:compile [INFO] | | +- commons-cli:commons-cli:jar:1.2:compile [INFO] | | +- xmlenc:xmlenc:jar:0.52:compile [INFO] | | +- commons-httpclient:commons-httpclient:jar:3.1:compile [INFO] | | +- commons-io:commons-io:jar:2.4:compile [INFO] | | +- commons-collections:commons-collections:jar:3.2.1:compile [INFO] | | +- commons-lang:commons-lang:jar:2.6:compile [INFO] | | +- commons-configuration:commons-configuration:jar:1.6:compile [INFO] | | | +- commons-digester:commons-digester:jar:1.8:compile [INFO] | | | | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile [INFO] | | | \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile Looks like we can exclude commons-beanutils in pom.xml for hadoop-client dependency. e.g. in core/pom.xml : dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId exclusions exclusion groupIdjavax.servlet/groupId artifactIdservlet-api/artifactId /exclusion /exclusions /dependency On Thu, Apr 9, 2015 at 2:14 PM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Though the warnings can be ignored, they add up in the log files while compiling other projects too. And there are a lot of those warnings. Any workaround? How do we modify the pom.xml file to exclude these unnecessary dependencies? On Fri, Apr 10, 2015 at 2:29 AM, Sean Owen so...@cloudera.com wrote: Generally, you can ignore these things. They mean some artifacts packaged other artifacts, and so two copies show up when all the JAR contents are merged. But here you do show a small dependency convergence problem; beanutils 1.7 is present but beanutills-core 1.8 is too even though these should be harmonized. I imagine one could be excluded; I imagine we could harmonize the version manually. In practice, I also imagine it doesn't cause any problem but feel free to propose a fix along those lines. On Thu, Apr 9, 2015 at 4:54 PM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Hi, During compilation I get a lot of these: [WARNING] kryo-2.21.jar, reflectasm-1.07-shaded.jar define 23 overlappping classes: [WARNING] commons-beanutils-1.7.0.jar, commons-beanutils-core-1.8.0.jar define 82 overlappping classes: [WARNING] commons-beanutils-1.7.0.jar, commons-collections-3.2.1.jar, commons-beanutils-core-1.8.0.jar define 10 overlappping classes: And a lot of others. How do I fix these? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Pairwise computations within partition
Hello everyone, I am a Spark novice facing a nontrivial problem to solve with Spark. I have an RDD consisting of many elements (say, 60K), where each element is is a d-dimensional vector. I want to implement an iterative algorithm which does the following. At each iteration, I want to apply an operation on *pairs* of elements (say, compute their dot product). Of course the number of pairs is huge, but I only need to consider a small random subset of the possible pairs at each iteration. To minimize communication between nodes, I am willing to partition my RDD by key (where each elements gets a random key) and to only consider pairs of elements that belong to the same partition (i.e., that share the same key). But I am not sure how to sample and apply the operation on pairs, and to make sure that the computation for each pair is indeed done by the node holding the corresponding elements. Any help would be greatly appreciated. Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pairwise-computations-within-partition-tp22436.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Which Hive version should be used for Spark 1.3
By default Spark 1.3 has bindings to Hive 0.13.1 though you can bind it to Hive 0.12 if you specify it in the profile when building Spark as per https://spark.apache.org/docs/1.3.0/building-spark.html. If you are downloading a pre built version of Spark 1.3 - then by default, it is set to Hive 0.13.1. HTH! On Thu, Apr 9, 2015 at 10:03 AM ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Most likely you have an existing Hive installation with data in it. In this case i was not able to get Spark 1.3 communicate with existing Hive meta store. Hence when i read any table created in hive, Spark SQL used to complain Data table not found If you get it working, please share the steps. On Thu, Apr 9, 2015 at 9:25 PM, Arthur Chan arthur.hk.c...@gmail.com wrote: Hi, I use Hive 0.12 for Spark 1.2 at the moment and plan to upgrade to Spark 1.3.x Could anyone advise which Hive version should be used to match Spark 1.3.x? Can I use Hive 1.1.0 for Spark 1.3? or can I use Hive 0.14 for Spark 1.3? Regards Arthur -- Deepak
RDD union
Hi, I have some code that creates ~ 80 RDD and then a sc.union is applied to combine all 80 into one for the next step (to run topByKey for example)... While creating 80 RDDs take 3 mins per RDD, doing a union over them takes 3 hrs (I am validating these numbers)... Is there any checkpoint based option to further speed up the union ? Thanks. Deb
Re: External JARs not loading Spark Shell Scala 2.11
Ok, what do i need to do in order to migrate the patch? Thanks Alex On Thu, Apr 9, 2015 at 11:54 AM, Prashant Sharma scrapco...@gmail.com wrote: This is the jira I referred to https://issues.apache.org/jira/browse/SPARK-3256. Another reason for not working on it is evaluating priority between upgrading to scala 2.11.5(it is non trivial I suppose because repl has changed a bit) or migrating that patch is much simpler. Prashant Sharma On Thu, Apr 9, 2015 at 4:16 PM, Alex Nakos ana...@gmail.com wrote: Hi- Was this the JIRA issue? https://issues.apache.org/jira/browse/SPARK-2988 Any help in getting this working would be much appreciated! Thanks Alex On Thu, Apr 9, 2015 at 11:32 AM, Prashant Sharma scrapco...@gmail.com wrote: You are right this needs to be done. I can work on it soon, I was not sure if there is any one even using scala 2.11 spark repl. Actually there is a patch in scala 2.10 shell to support adding jars (Lost the JIRA ID), which has to be ported for scala 2.11 too. If however, you(or anyone else) are planning to work, I can help you ? Prashant Sharma On Thu, Apr 9, 2015 at 3:08 PM, anakos ana...@gmail.com wrote: Hi- I am having difficulty getting the 1.3.0 Spark shell to find an external jar. I have build Spark locally for Scala 2.11 and I am starting the REPL as follows: bin/spark-shell --master yarn --jars data-api-es-data-export-4.0.0.jar I see the following line in the console output: 15/04/09 09:52:15 INFO spark.SparkContext: Added JAR file:/opt/spark/spark-1.3.0_2.11-hadoop2.3/data-api-es-data-export-4.0.0.jar at http://192.168.115.31:54421/jars/data-api-es-data-export-4.0.0.jar with timestamp 1428569535904 but when i try to import anything from this jar, it's simply not available. When I try to add the jar manually using the command :cp /path/to/jar the classes in the jar are still unavailable. I understand that 2.11 is not officially supported, but has anyone been able to get an external jar loaded in the 1.3.0 release? Is this a known issue? I have tried searching around for answers but the only thing I've found that may be related is this: https://issues.apache.org/jira/browse/SPARK-3257 Any/all help is much appreciated. Thanks Alex -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/External-JARs-not-loading-Spark-Shell-Scala-2-11-tp22434.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Could not compute split, block not found in Spark Streaming Simple Application
Sorry, I was getting those errors because my workload was not sustainable. However, I noticed that, by just running the spark-streaming-benchmark ( https://github.com/tdas/spark-streaming-benchmark/blob/master/Benchmark.scala ), I get no difference on the execution time, number of processed records, and delay whether I'm using 1 machine or 2 machines with the setup described before (using spark standalone). Is it normal? On Fri, Mar 27, 2015 at 5:32 PM, Tathagata Das t...@databricks.com wrote: If it is deterministically reproducible, could you generate full DEBUG level logs, from the driver and the workers and give it to me? Basically I want to trace through what is happening to the block that is not being found. And can you tell what Cluster manager are you using? Spark Standalone, Mesos or YARN? On Fri, Mar 27, 2015 at 10:09 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I am just running this simple example with machineA: 1 master + 1 worker machineB: 1 worker « val ssc = new StreamingContext(sparkConf, Duration(1000)) val rawStreams = (1 to numStreams).map(_ =ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER)).toArray val union = ssc.union(rawStreams) union.filter(line = Random.nextInt(1) == 0).map(line = { var sum = BigInt(0) line.toCharArray.foreach(chr = sum += chr.toInt) fib2(sum) sum }).reduceByWindow(_+_, Seconds(1),Seconds(1)).map(s = s### result: $s).print() » And I'm getting the following exceptions: Log from machineB « 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 132 15/03/27 16:21:35 INFO Executor: Running task 0.0 in stage 27.0 (TID 132) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 134 15/03/27 16:21:35 INFO Executor: Running task 2.0 in stage 27.0 (TID 134) 15/03/27 16:21:35 INFO TorrentBroadcast: Started reading broadcast variable 24 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 136 15/03/27 16:21:35 INFO Executor: Running task 4.0 in stage 27.0 (TID 136) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 138 15/03/27 16:21:35 INFO Executor: Running task 6.0 in stage 27.0 (TID 138) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 140 15/03/27 16:21:35 INFO Executor: Running task 8.0 in stage 27.0 (TID 140) 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(1886) called with curMem=47117, maxMem=280248975 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24_piece0 stored as bytes in memory (estimated size 1886.0 B, free 267.2 MB) 15/03/27 16:21:35 INFO BlockManagerMaster: Updated info of block broadcast_24_piece0 15/03/27 16:21:35 INFO TorrentBroadcast: Reading broadcast variable 24 took 19 ms 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(3104) called with curMem=49003, maxMem=280248975 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 3.0 KB, free 267.2 MB) 15/03/27 16:21:35 ERROR Executor: Exception in task 8.0 in stage 27.0 (TID 140) java.lang.Exception: Could not compute split, block input-0-1427473262420 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:701) 15/03/27 16:21:35 ERROR Executor: Exception in task 6.0 in stage 27.0 (TID 138) java.lang.Exception: Could not compute split, block input-0-1427473262418 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at
Re: Kryo exception : Encountered unregistered class ID: 13994
Is there custom class involved in your application ? I assume you have called sparkConf.registerKryoClasses() for such class(es). Cheers On Thu, Apr 9, 2015 at 7:15 AM, mehdisinger mehdi.sin...@lampiris.be wrote: Hi, I'm facing an issue when I try to run my Spark application. I keep getting the following exception: 15/04/09 15:14:07 ERROR Executor: Exception in task 5.0 in stage 1.0 (TID 5) com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 Serialization trace: ord (org.apache.spark.util.BoundedPriorityQueue) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236) at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:169) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I'm not sure where this exception occurs exactly... Does anyone know about this issue? I'm running Spark version 1.1.0. My Master and workers are running on different machines (cluster mode), all with the exact same architecture/configuration Can anyone help? Regards -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-exception-Encountered-unregistered-class-ID-13994-tp22437.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Incremently load big RDD file into Memory
Hi, Thanks a lot for such a detailed response. On Wed, Apr 8, 2015 at 8:55 PM, Guillaume Pitel guillaume.pi...@exensa.com wrote: Hi Muhammad, There are lots of ways to do it. My company actually develops a text mining solution which embeds a very fast Approximate Neighbours solution (a demo with real time queries on the wikipedia dataset can be seen at wikinsights.org). For the record, we now prepare a dataset of 4.5 million documents for querying in about 2 or 3 minutes on a 32 cores cluster, and the queries take less than 10ms when the dataset is in memory. But if you just want to precompute everything and don't mind waiting a few tens of minutes (or hours), and don't want to bother with an approximate neighbour solution, then the best way is probably something like this : 1 - block your data (i.e. group your items in X large groups). Instead of a dataset of N elements, you should now have a dataset of X blocks containing N/X elements each. 2 - do the cartesian product (instead of N*N elements, you now have just X*X blocks, which should take less memory) 3 - for each pair of blocks (blockA,blockB), perform the computation of distances for each elements of blockA with each element of blockB, but keep only the top K best for each element of blockA. Output is List((elementOfBlockA, listOfKNearestElementsOfBlockBWithTheDistance),..) 4 - reduceByKey (the key is the elementOfBlockA), by merging the listOfNearestElements and always keeping the K nearest. This is an exact version of top K. This is only interesting if K N/X. But even if K is large, it is possible that it will fit your needs. Remember that you will still compute N*N distances (this is the problem with exact nearest neighbours), the only difference with what you're doing now is that you produces less items and duplicates less data. Indeed, if one of your elements takes 100bytes, the per element cartesian will produce N*N*100*2 bytes, while the blocked version will produce X*X*100*2*N/X, ie X*N*100*2 bytes. Guillaume Hi Guillaume, Thanks for you reply. Can you please tell me how can i improve for Top-k nearest points. P.S. My post is not accepted on the list thats why i am sending you email here. I would be really grateful to you if you reply it. Thanks, On Wed, Apr 8, 2015 at 1:23 PM, Guillaume Pitel guillaume.pi...@exensa.com wrote: This kind of operation is not scalable, not matter what you do, at least if you _really_ want to do that. However, if what you're looking for is not to really compute all distances, (for instance if you're looking only for the top K nearest points), then it can be highly improved. It all depends of what you want to do eventually. Guillaume val locations = filelines.map(line = line.split(\t)).map(t = (t(5).toLong, (t(2).toDouble, t(3).toDouble))).distinct().collect() val cartesienProduct=locations.cartesian(locations).map(t= Edge(t._1._1,t._2._1,distanceAmongPoints(t._1._2._1,t._1._2._2,t._2._2._1,t._2._2._2))) Code executes perfectly fine uptill here but when i try to use cartesienProduct it got stuck i.e. val count =cartesienProduct.count() Any help to efficiently do this will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Incremently-load-big-RDD-file-into-Memory-tp22410.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: eXenSa] *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- Regards, Muhammad Aamir *CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.* -- [image: eXenSa] *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- Regards, Muhammad Aamir *CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.*
Re: Jobs failing with KryoException (BufferOverflow)
Typo in previous email, pardon me. Set spark.driver.maxResultSize to 1068 or higher. On Thu, Apr 9, 2015 at 8:57 AM, Ted Yu yuzhih...@gmail.com wrote: Please set spark.kryoserializer.buffer.max.mb to 1068 (or higher). Cheers On Thu, Apr 9, 2015 at 8:54 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Pressed send early. I had tried that with these settings buffersize=128 maxbuffersize=1024 val conf = new SparkConf() .setAppName(detail) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb,arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb,arguments.get(maxbuffersize).get) .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum])) On Thu, Apr 9, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Yes i had tried that. Now i see this 15/04/09 07:58:08 INFO scheduler.DAGScheduler: Job 0 failed: collect at VISummaryDataProvider.scala:38, took 275.334991 s 15/04/09 07:58:08 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Total size of serialized results of 4 tasks (1067.3 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 4 tasks (1067.3 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/04/09 07:58:08 INFO storage.BlockManagerInfo: Removed taskresult_4 on phxaishdc9dn0579.phx.ebay.com:42771 in memory (size: 273.5 MB, free: 6.2 GB) 15/04/09 07:58:08 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User On Thu, Apr 9, 2015 at 8:18 PM, Ted Yu yuzhih...@gmail.com wrote: Please take a look at https://code.google.com/p/kryo/source/browse/trunk/src/com/esotericsoftware/kryo/io/Output.java?r=236 , starting line 27. In Spark, you can control the maxBufferSize with spark.kryoserializer.buffer.max.mb Cheers -- Deepak -- Deepak
How to submit job in a different user?
Well, maybe a Linux configure problem... I have a cluster that is about to expose to the public, and I want everyone that uses my cluster owns a user (without permissions of sudo, etc.)(e.g. 'guest'), and is able to submit tasks to Spark, which working on Mesos that running with a different, private user ('sparkuser' for example). But, now let's say I launched Mesos slave at Node 1 with 'sparkuser', Node 2 with 'guest', and submit a job with 'guest', then Node 1 will fail, saying: Failed to change user to 'guest': Failed to set gid: Operation not permitted. Any solution? Or this just doesn't make any sense? Thanks.
Re: Spark Job #of attempts ?
Can I see current values of all configs. Similar to configuration in Hadoop world from ui ? Sent from my iPhone On 09-Apr-2015, at 11:07 pm, Marcelo Vanzin van...@cloudera.com wrote: Set spark.yarn.maxAppAttempts=1 if you don't want retries. On Thu, Apr 9, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello, I have a spark job with 5 stages. After it runs 3rd stage, the console shows 15/04/09 10:25:57 INFO yarn.Client: Application report for application_1427705526386_127168 (state: RUNNING) 15/04/09 10:25:58 INFO yarn.Client: Application report for application_1427705526386_127168 (state: RUNNING) 15/04/09 10:25:59 INFO yarn.Client: Application report for application_1427705526386_127168 (state: ACCEPTED) 15/04/09 10:25:59 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: hdmi-express start time: 1428598679223 final status: UNDEFINED tracking URL: https://apollo-phx-rm-1.vip.ebay.com:50030/proxy/application_1427705526386_127168/ user: dvasthimal 15/04/09 10:26:00 INFO yarn.Client: Application report for application_1427705526386_127168 (state: ACCEPTED) 15/04/09 10:26:01 INFO yarn.Client: Application report for application_1427705526386_127168 (state: ACCEPTED) and then running again. This looks as if the stage failed and Spark restarted the job from beginning. If thats not the case, when i click the spark UI web page, it does not show already completed stages and instead goes back to running stage #1. Is there some setting to turn this behavior off ? -- Deepak -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SQL can't not create Hive database
Hi, I am working on the local mode. The following code hiveContext.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse) hiveContext.sql(create database if not exists db1) throws 15/04/09 13:53:16 ERROR RetryingHMSHandler: MetaException(message:Unable to create database path file:/user/hive/warehouse/db1.db, failed to create database db1) It seems that it uses hdfs path, not the local one specified in hiveContext. Any idea ? Thank you. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-can-t-not-create-Hive-database-tp22435.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Job Run Resource Estimation ?
Thanks Sandy, apprechiate On Thu, Apr 9, 2015 at 10:32 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Deepak, I'm going to shamelessly plug my blog post on tuning Spark: http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ It talks about tuning executor size as well as how the number of tasks for a stage is calculated. -Sandy On Thu, Apr 9, 2015 at 9:21 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have a spark job that has multiple stages. For now i star it with 100 executors, each with 12G mem (max is 16G). I am using Spark 1.3 over YARN 2.4.x. For now i start the Spark Job with a very limited input (1 file of size 2G), overall there are 200 files. My first run is yet to complete as its taking too much of time / throwing OOM exceptions / buffer exceptions (keep that aside). How will i know how much resources are required to run this job. (# of cores, executors, mem, serialization buffers, and i do not yet what else). IN M/R world, all i do is set split size and rest is taken care automatically (yes i need to worry about mem, in case of OOM). 1) Can someone explain how they do resource estimation before running the job or is there no way and one needs to only try it out ? 2) Even if i give 100 executors, the first stage takes only 5, how did spark decide this ? Please point me to any resources that also talks about similar things or please explain here. -- Deepak -- Deepak
Re: Spark Job #of attempts ?
Set spark.yarn.maxAppAttempts=1 if you don't want retries. On Thu, Apr 9, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello, I have a spark job with 5 stages. After it runs 3rd stage, the console shows 15/04/09 10:25:57 INFO yarn.Client: Application report for application_1427705526386_127168 (state: RUNNING) 15/04/09 10:25:58 INFO yarn.Client: Application report for application_1427705526386_127168 (state: RUNNING) 15/04/09 10:25:59 INFO yarn.Client: Application report for application_1427705526386_127168 (state: ACCEPTED) 15/04/09 10:25:59 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: hdmi-express start time: 1428598679223 final status: UNDEFINED tracking URL: https://apollo-phx-rm-1.vip.ebay.com:50030/proxy/application_1427705526386_127168/ user: dvasthimal 15/04/09 10:26:00 INFO yarn.Client: Application report for application_1427705526386_127168 (state: ACCEPTED) 15/04/09 10:26:01 INFO yarn.Client: Application report for application_1427705526386_127168 (state: ACCEPTED) and then running again. This looks as if the stage failed and Spark restarted the job from beginning. If thats not the case, when i click the spark UI web page, it does not show already completed stages and instead goes back to running stage #1. Is there some setting to turn this behavior off ? -- Deepak -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Join on Spark too slow.
Maybe I'm wrong, but what you are doing here is basically a bunch of cartesian product for each key. So if hello appear 100 times in your corpus, it will produce 100*100 elements in the join output. I don't understand what you're doing here, but it's normal your join takes forever, it makes no sense as it, IMO. Guillaume Hello guys, I am trying to run the following dummy example for Spark, on a dataset of 250MB, using 5 machines with 10GB RAM each, but the join seems to be taking too long ( 2hrs). I am using Spark 0.8.0 but I have also tried the same example on more recent versions, with the same results. Do you have any idea why this is happening? Thanks a lot, Kostas ** *val *sc = *new *SparkContext( args(0), *DummyJoin*, System./getenv/(*SPARK_HOME*), /Seq/(System./getenv/(*SPARK_EXAMPLES_JAR*))) *val *file = sc.textFile(args(1)) *val *wordTuples = file .flatMap(line = line.split(args(2))) .map(word = (word, 1)) *val *big = wordTuples.filter { *case *((k, v)) = k != *a *}.cache() *val *small = wordTuples.filter { *case *((k, v)) = k != *a * k != *to * k != *and *}.cache() *val *res = big.leftOuterJoin(small) res.saveAsTextFile(args(3)) } -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Pairwise computations within partition
I would try something like that : val a = rdd.sample(false,0.1,1).zipwithindex.map{ case (vector,index) = (index,vector)} val b = rdd.sample(false,0.1,2).zipwithindex.map{ case (vector,index) = (index,vector)} a.join(b).map { case (_,(vectora,vectorb)) = yourOperation } Grouping by blocks is probably not what you want, since it would restrict the scope of a vector to the vectors in the same block. Guillaume Hello everyone, I am a Spark novice facing a nontrivial problem to solve with Spark. I have an RDD consisting of many elements (say, 60K), where each element is is a d-dimensional vector. I want to implement an iterative algorithm which does the following. At each iteration, I want to apply an operation on *pairs* of elements (say, compute their dot product). Of course the number of pairs is huge, but I only need to consider a small random subset of the possible pairs at each iteration. To minimize communication between nodes, I am willing to partition my RDD by key (where each elements gets a random key) and to only consider pairs of elements that belong to the same partition (i.e., that share the same key). But I am not sure how to sample and apply the operation on pairs, and to make sure that the computation for each pair is indeed done by the node holding the corresponding elements. Any help would be greatly appreciated. Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pairwise-computations-within-partition-tp22436.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Any success on embedding local Spark in OSGi?
Hi, I have been trying to use Spark in an OSGi bundle but I had no luck so far. I have seen similar mails in the past, so I am wondering, had anyone successfully run Spark inside an OSGi bundle? I am running Spark in the bundle created with Maven shade plugin and even tried adding Akka JARs inside the bundle classpath manually. I am not trying to expose it via services, but Akka configuration is not wroking. I even managed to read reference.conf directly from bundle context and set it with SparkConf. But it gave me an error regarding the parsing of value '20s'. I had been struggling with this for some time and this is almost a matter of life and death (in business terms). Do you have any advice? Thanks in advance, Deniz
Re: Overlapping classes warnings
commons-beanutils is brought in transitively: [INFO] | +- org.apache.hadoop:hadoop-common:jar:2.4.0:compile [INFO] | | +- commons-cli:commons-cli:jar:1.2:compile [INFO] | | +- xmlenc:xmlenc:jar:0.52:compile [INFO] | | +- commons-httpclient:commons-httpclient:jar:3.1:compile [INFO] | | +- commons-io:commons-io:jar:2.4:compile [INFO] | | +- commons-collections:commons-collections:jar:3.2.1:compile [INFO] | | +- commons-lang:commons-lang:jar:2.6:compile [INFO] | | +- commons-configuration:commons-configuration:jar:1.6:compile [INFO] | | | +- commons-digester:commons-digester:jar:1.8:compile [INFO] | | | | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile [INFO] | | | \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile Looks like we can exclude commons-beanutils in pom.xml for hadoop-client dependency. e.g. in core/pom.xml : dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId exclusions exclusion groupIdjavax.servlet/groupId artifactIdservlet-api/artifactId /exclusion /exclusions /dependency On Thu, Apr 9, 2015 at 2:14 PM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Though the warnings can be ignored, they add up in the log files while compiling other projects too. And there are a lot of those warnings. Any workaround? How do we modify the pom.xml file to exclude these unnecessary dependencies? On Fri, Apr 10, 2015 at 2:29 AM, Sean Owen so...@cloudera.com wrote: Generally, you can ignore these things. They mean some artifacts packaged other artifacts, and so two copies show up when all the JAR contents are merged. But here you do show a small dependency convergence problem; beanutils 1.7 is present but beanutills-core 1.8 is too even though these should be harmonized. I imagine one could be excluded; I imagine we could harmonize the version manually. In practice, I also imagine it doesn't cause any problem but feel free to propose a fix along those lines. On Thu, Apr 9, 2015 at 4:54 PM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Hi, During compilation I get a lot of these: [WARNING] kryo-2.21.jar, reflectasm-1.07-shaded.jar define 23 overlappping classes: [WARNING] commons-beanutils-1.7.0.jar, commons-beanutils-core-1.8.0.jar define 82 overlappping classes: [WARNING] commons-beanutils-1.7.0.jar, commons-collections-3.2.1.jar, commons-beanutils-core-1.8.0.jar define 10 overlappping classes: And a lot of others. How do I fix these?
override log4j.properties
Hello, How to override log4j.properties for a specific spark job? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Join on Spark too slow.
Hello guys, I am trying to run the following dummy example for Spark, on a dataset of 250MB, using 5 machines with 10GB RAM each, but the join seems to be taking too long ( 2hrs). I am using Spark 0.8.0 but I have also tried the same example on more recent versions, with the same results. Do you have any idea why this is happening? Thanks a lot, Kostas *val *sc = *new *SparkContext( args(0), *DummyJoin*, System.*getenv*(*SPARK_HOME*), *Seq*(System.*getenv*(*SPARK_EXAMPLES_JAR*))) *val *file = sc.textFile(args(1)) *val *wordTuples = file .flatMap(line = line.split(args(2))) .map(word = (word, 1)) *val *big = wordTuples.filter { *case *((k, v)) = k != *a*}.cache() *val *small = wordTuples.filter { *case *((k, v)) = k != *a * k != *to * k != *and*}.cache() *val *res = big.leftOuterJoin(small) res.saveAsTextFile(args(3)) }
Continuous WARN messages from BlockManager about block replication
Hi, I'm running a spark streaming job in local mode (--master local[4]), and I'm seeing tons of these messages, roughly once every second - WARN BlockManager: Block input-0-1428527584600 replicated to only 0 peer(s) instead of 1 peers We're using spark 1.2.1. Even with TRACE logging enabled, we're not seeing any log messages indicating failure to replicate the blocks. Should we be concerned about this warning (and if so, how should we debug this), or is this a corner case in local mode where replication is not attempted, but the warning is emitted anyway? If so, what is the workaround? thanks Nandan
Re: Jobs failing with KryoException (BufferOverflow)
Yes i had tried that. Now i see this 15/04/09 07:58:08 INFO scheduler.DAGScheduler: Job 0 failed: collect at VISummaryDataProvider.scala:38, took 275.334991 s 15/04/09 07:58:08 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Total size of serialized results of 4 tasks (1067.3 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 4 tasks (1067.3 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/04/09 07:58:08 INFO storage.BlockManagerInfo: Removed taskresult_4 on phxaishdc9dn0579.phx.ebay.com:42771 in memory (size: 273.5 MB, free: 6.2 GB) 15/04/09 07:58:08 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User On Thu, Apr 9, 2015 at 8:18 PM, Ted Yu yuzhih...@gmail.com wrote: Please take a look at https://code.google.com/p/kryo/source/browse/trunk/src/com/esotericsoftware/kryo/io/Output.java?r=236 , starting line 27. In Spark, you can control the maxBufferSize with spark.kryoserializer.buffer.max.mb Cheers -- Deepak
Lookup / Access of master data in spark streaming
Hi Friends, I am trying to solve a use case in spark streaming, I need help on getting to right approach on lookup / update the master data. Use case ( simplified ) I've a dataset of entity with three attributes and identifier/row key in a persistent store. Each attribute along with row key come from a different stream let's say, effectively 3 source streams. Now whenever any attribute comes up, I want to update/sync the persistent store and do some processing, but the processing would require the latest state of entity with latest values of three attributes. I wish if I have the all the entities cached in some sort of centralized cache ( like we have data in hdfs ) within spark streaming which may be used for data local processing. But I assume there is no such thing. potential approaches I m thinking of, I suspect first two are not feasible, but I want to confirm, 1. Is Broadcast Variables mutable ? If yes, can I use it as cache for all entities sizing around 100s of GBs provided i have a cluster with enough RAM. 1. Is there any kind of sticky partition possible, so that I route my stream data to go through the same node where I've the corresponding entities, subset of entire store, cached in memory within JVM / off heap on the node, this would avoid lookups from store. 2. If I stream the entities from persistent store into engine, this becomes 4th stream - the entity stream, how do i use join / merge to enable stream 1,2,3 to lookup and update the data from stream 4. Would DStream.join work for few seconds worth of data in attribute streams with all data in entity stream ? Or do I use transform and within that use rdd join, I've doubts if I am leaning towards core spark approach in spark streaming ? 1. The last approach, which i think will surely work but i want to avoid, is i keep the entities in IMDB and do lookup/update calls on from stream 1,2 and 3. Any help is deeply appreciated as this would help me design my system efficiently and the solution approach may become a beacon for lookup master data sort of problems. Regards, Amit NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the communication is free of errors, virus, interception or interference.
Re: Overlapping classes warnings
Generally, you can ignore these things. They mean some artifacts packaged other artifacts, and so two copies show up when all the JAR contents are merged. But here you do show a small dependency convergence problem; beanutils 1.7 is present but beanutills-core 1.8 is too even though these should be harmonized. I imagine one could be excluded; I imagine we could harmonize the version manually. In practice, I also imagine it doesn't cause any problem but feel free to propose a fix along those lines. On Thu, Apr 9, 2015 at 4:54 PM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Hi, During compilation I get a lot of these: [WARNING] kryo-2.21.jar, reflectasm-1.07-shaded.jar define 23 overlappping classes: [WARNING] commons-beanutils-1.7.0.jar, commons-beanutils-core-1.8.0.jar define 82 overlappping classes: [WARNING] commons-beanutils-1.7.0.jar, commons-collections-3.2.1.jar, commons-beanutils-core-1.8.0.jar define 10 overlappping classes: And a lot of others. How do I fix these? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Class incompatible error
I changed the JDK to Oracle but I still get this error. Not sure what it means by Stream class is incompatible with local class. I am using the following build on the server spark-1.2.1-bin-hadoop2.4 15/04/09 15:26:24 ERROR JobScheduler: Error running job streaming job 1428607584000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 66, ip-10-241-251-232.us-west-2.compute.internal): java.io.InvalidClassException: org.apache.spark.Aggregator; local class incompatible: stream classdesc serialVersionUID = 5032037208639381169, local class serialVersionUID = -9085606473104903453 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) On Wed, Apr 8, 2015 at 3:40 PM, Ted Yu yuzhih...@gmail.com wrote: bq. one is Oracle and the other is OpenJDK I don't have experience with mixed JDK's. Can you try with using single JDK ? Cheers On Wed, Apr 8, 2015 at 3:26 PM, Mohit Anchlia mohitanch...@gmail.com wrote: For the build I am using java version 1.7.0_65 which seems to be the same as the one on the spark host. However one is Oracle and the other is OpenJDK. Does that make any difference? On Wed, Apr 8, 2015 at 1:24 PM, Ted Yu yuzhih...@gmail.com wrote: What version of Java do you use to build ? Cheers On Wed, Apr 8, 2015 at 12:43 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am seeing the following, is this because of my maven version? 15/04/08 15:42:22 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-241-251-232.us-west-2.compute.internal): java.io.InvalidClassException: org.apache.spark.Aggregator; local class incompatible: stream classdesc serialVersionUID = 5032037208639381169, local class serialVersionUID = -9085606473104903453 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version /dependency
Re: [GraphX] aggregateMessages with active set
In aggregateMessagesWithActiveSet, Spark still have to read all edges. It means that a fixed time which scale with graph size is unavoidable on a pregel-like iteration. But what if I have to iterate nearly 100 iterations but at the last 50 iterations there are only 0.1% nodes need to be updated ? The fixed time make the program finished at a unacceptable time consumption. Alcaid 2015-04-08 1:41 GMT+08:00 Ankur Dave ankurd...@gmail.com: We thought it would be better to simplify the interface, since the active set is a performance optimization but the result is identical to calling subgraph before aggregateMessages. The active set option is still there in the package-private method aggregateMessagesWithActiveSet. You can actually access it publicly via GraphImpl, though the API isn't guaranteed to be stable: graph.asInstanceOf[GraphImpl[VD,ED]].aggregateMessagesWithActiveSet(...) Ankur On Tue, Apr 7, 2015 at 2:56 AM, James alcaid1...@gmail.com wrote: Hello, The old api of GraphX mapReduceTriplets has an optional parameter activeSetOpt: Option[(VertexRDD[_] that limit the input of sendMessage. However, to the new api aggregateMessages I could not find this option, why it does not offer any more? Alcaid
Re: Overlapping classes warnings
Though the warnings can be ignored, they add up in the log files while compiling other projects too. And there are a lot of those warnings. Any workaround? How do we modify the pom.xml file to exclude these unnecessary dependencies? On Fri, Apr 10, 2015 at 2:29 AM, Sean Owen so...@cloudera.com wrote: Generally, you can ignore these things. They mean some artifacts packaged other artifacts, and so two copies show up when all the JAR contents are merged. But here you do show a small dependency convergence problem; beanutils 1.7 is present but beanutills-core 1.8 is too even though these should be harmonized. I imagine one could be excluded; I imagine we could harmonize the version manually. In practice, I also imagine it doesn't cause any problem but feel free to propose a fix along those lines. On Thu, Apr 9, 2015 at 4:54 PM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Hi, During compilation I get a lot of these: [WARNING] kryo-2.21.jar, reflectasm-1.07-shaded.jar define 23 overlappping classes: [WARNING] commons-beanutils-1.7.0.jar, commons-beanutils-core-1.8.0.jar define 82 overlappping classes: [WARNING] commons-beanutils-1.7.0.jar, commons-collections-3.2.1.jar, commons-beanutils-core-1.8.0.jar define 10 overlappping classes: And a lot of others. How do I fix these?
Re: make two rdd co-partitioned in python
In Spark 1.3+, PySpark also support this kind of narrow dependencies, for example, N = 10 a1 = a.partitionBy(N) b1 = b.partitionBy(N) then a1.union(b1) will only have N partitions. So, a1.join(b1) do not need shuffle anymore. On Thu, Apr 9, 2015 at 11:57 AM, pop xia...@adobe.com wrote: In scala, we can make two Rdd using the same partitioner so that they are co-partitioned val partitioner = new HashPartitioner(5) val a1 = a.partitionBy(partitioner).cache() val b1 = b.partiitonBy(partitioner).cache() How can we achieve the same in python? It would be great if somebody can share some examples. Thanks, Xiang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/make-two-rdd-co-partitioned-in-python-tp22445.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: override log4j.properties
One method: By putting your custom log4j.properties file in your /resources directory. As an example, please see: http://stackoverflow.com/a/2736/236007 Kind regards, Emre Sevinç http://www.bigindustries.be/ On Thu, Apr 9, 2015 at 2:17 PM, patcharee patcharee.thong...@uni.no wrote: Hello, How to override log4j.properties for a specific spark job? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Emre Sevinc
Re: Add row IDs column to data frame
Hi, I just checked and i can see that there is method called withColumn: def withColumn(colName: String, col: Column http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Column.html ): DataFrame http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/DataFrame.html Returns a new DataFrame http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/DataFrame.html by adding a column. I can't test it now... But i think it should work. As i see it whole idea for data frames is to make them like data frames in R. And in R you can do that easily. It was late last night and i was tired but my idea was that you can iterate over first set add some index to every log using acumulators and then iterate over other set and add index from other acumulator then create tuple with keys from indexes and join. It is ugly and not efficient, and you should avoid it. :] Best Bojan On Thu, Apr 9, 2015 at 1:35 AM, barmaley [via Apache Spark User List] ml-node+s1001560n22430...@n3.nabble.com wrote: Hi Bojan, Could you please expand your idea on how to append to RDD? I can think of how to append a constant value to each row on RDD: //oldRDD - RDD[Array[String]] val c = const val newRDD = oldRDD.map(r=c+:r) But how to append a custom column to RDD? Something like: val colToAppend = sc.makeRDD(1 to oldRDD.count().toInt) //or sc.parallelize(1 to oldRDD.count().toInt) //or (1 to 1 to oldRDD.count().toInt).toArray -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Add-row-IDs-column-to-data-frame-tp22385p22430.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=Ymxvb2Q5cmF2ZW5AZ21haWwuY29tfDF8NTk3ODE0NzQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Append-column-to-Data-Frame-or-RDD-tp22385p22432.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark job progress-style report on console ?
Hi, How do i get spark job progress-style report on console ? I tried to set --conf spark.ui.showConsoleProgress=true but it thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-job-progress-style-report-on-console-tp22440.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQL can't not create Hive database
Can you create the database directly within Hive? If you're getting the same error within Hive, it sounds like a permissions issue as per Bojan. More info can be found at: http://stackoverflow.com/questions/15898211/unable-to-create-database-path-file-user-hive-warehouse-error On Thu, Apr 9, 2015 at 7:31 AM Bojan Kostic blood9ra...@gmail.com wrote: I think it uses local dir, hdfs dir path starts with hdfs:// Check permissions on folders, and also check logs. There should be more info about exception. Best Bojan -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/SQL-can-t-not-create-Hive-database- tp22435p22439.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Lookup / Access of master data in spark streaming
Responses inline. Hope they help. On Thu, Apr 9, 2015 at 8:20 AM, Amit Assudani aassud...@impetus.com wrote: Hi Friends, I am trying to solve a use case in spark streaming, I need help on getting to right approach on lookup / update the master data. Use case ( simplified ) I’ve a dataset of entity with three attributes and identifier/row key in a persistent store. Each attribute along with row key come from a different stream let’s say, effectively 3 source streams. Now whenever any attribute comes up, I want to update/sync the persistent store and do some processing, but the processing would require the latest state of entity with latest values of three attributes. I wish if I have the all the entities cached in some sort of centralized cache ( like we have data in hdfs ) within spark streaming which may be used for data local processing. But I assume there is no such thing. potential approaches I m thinking of, I suspect first two are not feasible, but I want to confirm, 1. Is Broadcast Variables mutable ? If yes, can I use it as cache for all entities sizing around 100s of GBs provided i have a cluster with enough RAM. Broadcast variables are not mutable. But you can always create a new broadcast variable when you want and use the latest broadcast variable in your computation. dstream.transform { rdd = val latestBroacast = getLatestBroadcastVariable() // fetch existing or update+create new and return val transformedRDD = rdd. .. // use latestBroacast in RDD tranformations transformedRDD } Since the transform RDD-to-RDD function runs on the driver every batch interval, it will always use the latest broadcast variable that you want. Though note that whenever you create a new broadcast, the next batch may take a little longer to as the data needs to be actually broadcasted out. That can also be made asynchronous by running a simple task (to force the broadcasting out) on any new broadcast variable in a different thread as Spark Streaming batch schedule, but using the same underlying Spark Context. 1. Is there any kind of sticky partition possible, so that I route my stream data to go through the same node where I've the corresponding entities, subset of entire store, cached in memory within JVM / off heap on the node, this would avoid lookups from store. You could use updateStateByKey. That is quite sticky, but does not eliminate the possibility that it can run on a different node. In fact this is necessary for fault-tolerance - what if the node it was supposed to run goes down? The task will be run on a different node, and you have to design your application such that it can handle that. 1. If I stream the entities from persistent store into engine, this becomes 4th stream - the entity stream, how do i use join / merge to enable stream 1,2,3 to lookup and update the data from stream 4. Would DStream.join work for few seconds worth of data in attribute streams with all data in entity stream ? Or do I use transform and within that use rdd join, I’ve doubts if I am leaning towards core spark approach in spark streaming ? Depends on what kind of join! If you want the join every batch in stream with a static data set (or rarely updated dataset), the transform+join is the way to go. If you want to join one stream with a window of data from another stream, then DStream.join is the way to go. 1. 1. The last approach, which i think will surely work but i want to avoid, is i keep the entities in IMDB and do lookup/update calls on from stream 1,2 and 3. Any help is deeply appreciated as this would help me design my system efficiently and the solution approach may become a beacon for lookup master data sort of problems. Regards, Amit -- NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the communication is free of errors, virus, interception or interference.
Re: Advice using Spark SQL and Thrift JDBC Server
Hi Mohammed, Sorry, I guess I was not really clear in my response. Yes sbt fails, the -DskipTests is for mvn as I showed it in the example on how II built it. I do not believe that -DskipTests has any impact in sbt, but could be wrong. sbt package should skip tests. I did not try to track down where the dependency was coming from. Based on Patrick comments it sound like this is now resolved. Sorry for the confustion. -Todd On Wed, Apr 8, 2015 at 4:38 PM, Todd Nist tsind...@gmail.com wrote: Hi Mohammed, I think you just need to add -DskipTests to you build. Here is how I built it: mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests clean package install build/sbt does however fail even if only doing package which should skip tests. I am able to build the MyThriftServer above now. Thanks Michael for the assistance. -Todd On Wed, Apr 8, 2015 at 3:39 PM, Mohammed Guller moham...@glassbeam.com wrote: Michael, Thank you! Looks like the sbt build is broken for 1.3. I downloaded the source code for 1.3, but I get the following error a few minutes after I run “sbt/sbt publishLocal” [error] (network-shuffle/*:update) sbt.ResolveException: unresolved dependency: org.apache.spark#spark-network-common_2.10;1.3.0: configuration not public in org.apache.spark#spark-network-common_2.10;1.3.0: 'test'. It was required from org.apache.spark#spark-network-shuffle_2.10;1.3.0 test [error] Total time: 106 s, completed Apr 8, 2015 12:33:45 PM Mohammed *From:* Michael Armbrust [mailto:mich...@databricks.com] *Sent:* Wednesday, April 8, 2015 11:54 AM *To:* Mohammed Guller *Cc:* Todd Nist; James Aley; user; Patrick Wendell *Subject:* Re: Advice using Spark SQL and Thrift JDBC Server Sorry guys. I didn't realize that https://issues.apache.org/jira/browse/SPARK-4925 was not fixed yet. You can publish locally in the mean time (sbt/sbt publishLocal). On Wed, Apr 8, 2015 at 8:29 AM, Mohammed Guller moham...@glassbeam.com wrote: +1 Interestingly, I ran into the exactly the same issue yesterday. I couldn’t find any documentation about which project to include as a dependency in build.sbt to use HiveThriftServer2. Would appreciate help. Mohammed *From:* Todd Nist [mailto:tsind...@gmail.com] *Sent:* Wednesday, April 8, 2015 5:49 AM *To:* James Aley *Cc:* Michael Armbrust; user *Subject:* Re: Advice using Spark SQL and Thrift JDBC Server To use the HiveThriftServer2.startWithContext, I thought one would use the following artifact in the build: org.apache.spark%% spark-hive-thriftserver % 1.3.0 But I am unable to resolve the artifact. I do not see it in maven central or any other repo. Do I need to build Spark and publish locally or just missing something obvious here? Basic class is like this: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.HiveMetastoreTypes._ import org.apache.spark.sql.types._ import org.apache.spark.sql.hive.thriftserver._ object MyThriftServer { val sparkConf = new SparkConf() // master is passed to spark-submit, but could also be specified explicitely // .setMaster(sparkMaster) .setAppName(My ThriftServer) .set(spark.cores.max, 2) val sc = new SparkContext(sparkConf) val sparkContext = sc import sparkContext._ val sqlContext = new HiveContext(sparkContext) import sqlContext._ import sqlContext.implicits._ // register temp tables here HiveThriftServer2.startWithContext(sqlContext) } Build has the following: scalaVersion := 2.10.4 val SPARK_VERSION = 1.3.0 libraryDependencies ++= Seq( org.apache.spark %% spark-streaming-kafka % SPARK_VERSION exclude(org.apache.spark, spark-core_2.10) exclude(org.apache.spark, spark-streaming_2.10) exclude(org.apache.spark, spark-sql_2.10) exclude(javax.jms, jms), org.apache.spark %% spark-core % SPARK_VERSION % provided, org.apache.spark %% spark-streaming % SPARK_VERSION % provided, org.apache.spark %% spark-sql % SPARK_VERSION % provided, org.apache.spark %% spark-hive % SPARK_VERSION % provided, org.apache.spark %% spark-hive-thriftserver % SPARK_VERSION % provided, org.apache.kafka %% kafka % 0.8.1.1 exclude(javax.jms, jms) exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri), joda-time % joda-time % 2.7, log4j % log4j % 1.2.14 exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri) ) Appreciate the assistance. -Todd On Tue, Apr 7, 2015 at 4:09 PM, James Aley james.a...@swiftkey.com wrote: Excellent, thanks for your help, I appreciate your advice! On 7 Apr 2015 20:43, Michael Armbrust mich...@databricks.com wrote: That should totally work. The other option would be to run
Re: Spark Job Run Resource Estimation ?
Hi Deepak, I'm going to shamelessly plug my blog post on tuning Spark: http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ It talks about tuning executor size as well as how the number of tasks for a stage is calculated. -Sandy On Thu, Apr 9, 2015 at 9:21 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have a spark job that has multiple stages. For now i star it with 100 executors, each with 12G mem (max is 16G). I am using Spark 1.3 over YARN 2.4.x. For now i start the Spark Job with a very limited input (1 file of size 2G), overall there are 200 files. My first run is yet to complete as its taking too much of time / throwing OOM exceptions / buffer exceptions (keep that aside). How will i know how much resources are required to run this job. (# of cores, executors, mem, serialization buffers, and i do not yet what else). IN M/R world, all i do is set split size and rest is taken care automatically (yes i need to worry about mem, in case of OOM). 1) Can someone explain how they do resource estimation before running the job or is there no way and one needs to only try it out ? 2) Even if i give 100 executors, the first stage takes only 5, how did spark decide this ? Please point me to any resources that also talks about similar things or please explain here. -- Deepak
Re: Jobs failing with KryoException (BufferOverflow)
Pressed send early. I had tried that with these settings buffersize=128 maxbuffersize=1024 val conf = new SparkConf() .setAppName(detail) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb,arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb,arguments.get(maxbuffersize).get) .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum])) On Thu, Apr 9, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Yes i had tried that. Now i see this 15/04/09 07:58:08 INFO scheduler.DAGScheduler: Job 0 failed: collect at VISummaryDataProvider.scala:38, took 275.334991 s 15/04/09 07:58:08 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Total size of serialized results of 4 tasks (1067.3 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 4 tasks (1067.3 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/04/09 07:58:08 INFO storage.BlockManagerInfo: Removed taskresult_4 on phxaishdc9dn0579.phx.ebay.com:42771 in memory (size: 273.5 MB, free: 6.2 GB) 15/04/09 07:58:08 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User On Thu, Apr 9, 2015 at 8:18 PM, Ted Yu yuzhih...@gmail.com wrote: Please take a look at https://code.google.com/p/kryo/source/browse/trunk/src/com/esotericsoftware/kryo/io/Output.java?r=236 , starting line 27. In Spark, you can control the maxBufferSize with spark.kryoserializer.buffer.max.mb Cheers -- Deepak -- Deepak
Jobs failing with KryoException (BufferOverflow)
My Spark (1.3.0) job is failing with com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1+details com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1 at com.esotericsoftware.kryo.io.Output.require(Output.java:138) at com.esotericsoftware.kryo.io.Output.writeByte(Output.java:194) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:599) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:161) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- Deepak This is how am creating SparkContext (only once) val conf = new SparkConf() .setAppName(detail) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb,arguments.get(buffersize).get) .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum])) val sc = new SparkContext(conf) Command: ./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar --jars /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar --num-executors 100 --driver-memory 12g --driver-java-options -XX:MaxPermSize=6G --executor-memory 12g --executor-cores 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar startDate=2015-04-6 endDate=2015-04-7 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem output=/user/dvasthimal/epdatasets/viewItem* buffersize=200* Spark assembly has been built with Hive, including Datanucleus jars on classpath buffer size is 200. 1. What is this buffer ? 2. What should be the value of this buffer ? 3. My Spark Job has many stages, does the above value need to be different for each stage ? Please clarify Regards. Deepak
Re: SQL can't not create Hive database
I think it uses local dir, hdfs dir path starts with hdfs:// Check permissions on folders, and also check logs. There should be more info about exception. Best Bojan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-can-t-not-create-Hive-database-tp22435p22439.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
save as text file throwing null pointer error.
JavaRDDString lineswithoutStopWords = nonEmptylines .map(new FunctionString, String() { /** * */ private static final long serialVersionUID = 1L; @Override public String call(String line) throws Exception { // TODO Auto-generated method stub return removeStopWords(line, stopwords); } }); lineswithoutStopWords.saveAsTextFile(output/testop.txt); Exception in task 0.0 in stage 1.0 (TID 1) java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) at org.apache.hadoop.util.Shell.runCommand(Shell.java:404) at org.apache.hadoop.util.Shell.run(Shell.java:379) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) at org.apache.hadoop.util.Shell.execCommand(Shell.java:678) at org.apache.hadoop.util.Shell.execCommand(Shell.java:661) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639) at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/04/09 18:44:36 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) at org.apache.hadoop.util.Shell.runCommand(Shell.java:404) at org.apache.hadoop.util.Shell.run(Shell.java:379) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) at org.apache.hadoop.util.Shell.execCommand(Shell.java:678) at org.apache.hadoop.util.Shell.execCommand(Shell.java:661) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639) at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/04/09 18:44:36 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job 15/04/09 18:44:36 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 15/04/09 18:44:36 INFO TaskSchedulerImpl: Cancelling stage 1 15/04/09 18:44:36 INFO DAGScheduler: Job 1 failed: saveAsTextFile at TextPreProcessing.java:49, took 0.172959 s Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException at
Re: Caching and Actions
Your point #1 is a bit misleading. (1) The mappers are not executed in parallel when processing independently the same RDD. To clarify, I'd say: In one stage of execution, when pipelining occurs, mappers are not executed in parallel when processing independently the same RDD partition. On Thu, Apr 9, 2015 at 11:19 AM, spark_user_2015 li...@adobe.com wrote: That was helpful! The conclusion: (1) The mappers are not executed in parallel when processing independently the same RDD. (2) The best way seems to be (if enough memory is available and an action is applied to d1 and d2 later on) val d1 = data.map((x,y,z) = (x,y)).cache val d2 = d1.map((x,y) = (y,x)) - This avoids pipelining the d1 mapper and d2 mapper when computing d2 This is important to write efficient code, toDebugString helps a lot. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Caching-and-Actions-tp22418p22444.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: External JARs not loading Spark Shell Scala 2.11
Hi- Was this the JIRA issue? https://issues.apache.org/jira/browse/SPARK-2988 Any help in getting this working would be much appreciated! Thanks Alex On Thu, Apr 9, 2015 at 11:32 AM, Prashant Sharma scrapco...@gmail.com wrote: You are right this needs to be done. I can work on it soon, I was not sure if there is any one even using scala 2.11 spark repl. Actually there is a patch in scala 2.10 shell to support adding jars (Lost the JIRA ID), which has to be ported for scala 2.11 too. If however, you(or anyone else) are planning to work, I can help you ? Prashant Sharma On Thu, Apr 9, 2015 at 3:08 PM, anakos ana...@gmail.com wrote: Hi- I am having difficulty getting the 1.3.0 Spark shell to find an external jar. I have build Spark locally for Scala 2.11 and I am starting the REPL as follows: bin/spark-shell --master yarn --jars data-api-es-data-export-4.0.0.jar I see the following line in the console output: 15/04/09 09:52:15 INFO spark.SparkContext: Added JAR file:/opt/spark/spark-1.3.0_2.11-hadoop2.3/data-api-es-data-export-4.0.0.jar at http://192.168.115.31:54421/jars/data-api-es-data-export-4.0.0.jar with timestamp 1428569535904 but when i try to import anything from this jar, it's simply not available. When I try to add the jar manually using the command :cp /path/to/jar the classes in the jar are still unavailable. I understand that 2.11 is not officially supported, but has anyone been able to get an external jar loaded in the 1.3.0 release? Is this a known issue? I have tried searching around for answers but the only thing I've found that may be related is this: https://issues.apache.org/jira/browse/SPARK-3257 Any/all help is much appreciated. Thanks Alex -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/External-JARs-not-loading-Spark-Shell-Scala-2-11-tp22434.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Job Run Resource Estimation ?
I have a spark job that has multiple stages. For now i star it with 100 executors, each with 12G mem (max is 16G). I am using Spark 1.3 over YARN 2.4.x. For now i start the Spark Job with a very limited input (1 file of size 2G), overall there are 200 files. My first run is yet to complete as its taking too much of time / throwing OOM exceptions / buffer exceptions (keep that aside). How will i know how much resources are required to run this job. (# of cores, executors, mem, serialization buffers, and i do not yet what else). IN M/R world, all i do is set split size and rest is taken care automatically (yes i need to worry about mem, in case of OOM). 1) Can someone explain how they do resource estimation before running the job or is there no way and one needs to only try it out ? 2) Even if i give 100 executors, the first stage takes only 5, how did spark decide this ? Please point me to any resources that also talks about similar things or please explain here. -- Deepak
Re: Lookup / Access of master data in spark streaming
Thanks a lot TD for detailed answers. The answers lead to few more questions, 1. the transform RDD-to-RDD function runs on the driver “ - I didn’t understand this, does it mean when I use transform function on DStream, it is not parallelized, surely I m missing something here. 2. updateStateByKey I think won’t work in this use case, I have three separate attribute streams ( with different frequencies ) make up the combined state ( i.e. Entity ) at point in time on which I want to do some processing. Do you think otherwise ? 3. transform+join seems only option so far, but any guestimate how would this perform/ react on cluster ? Assuming, master data in 100s of Gbs, and join is based on some row key. We are talking about slice of stream data to be joined with 100s of Gbs of master data continuously. Is it something can be done but should not be done ? Regards, Amit From: Tathagata Das t...@databricks.commailto:t...@databricks.com Date: Thursday, April 9, 2015 at 3:13 PM To: amit assudani aassud...@impetus.commailto:aassud...@impetus.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Lookup / Access of master data in spark streaming Responses inline. Hope they help. On Thu, Apr 9, 2015 at 8:20 AM, Amit Assudani aassud...@impetus.commailto:aassud...@impetus.com wrote: Hi Friends, I am trying to solve a use case in spark streaming, I need help on getting to right approach on lookup / update the master data. Use case ( simplified ) I’ve a dataset of entity with three attributes and identifier/row key in a persistent store. Each attribute along with row key come from a different stream let’s say, effectively 3 source streams. Now whenever any attribute comes up, I want to update/sync the persistent store and do some processing, but the processing would require the latest state of entity with latest values of three attributes. I wish if I have the all the entities cached in some sort of centralized cache ( like we have data in hdfs ) within spark streaming which may be used for data local processing. But I assume there is no such thing. potential approaches I m thinking of, I suspect first two are not feasible, but I want to confirm, 1. Is Broadcast Variables mutable ? If yes, can I use it as cache for all entities sizing around 100s of GBs provided i have a cluster with enough RAM. Broadcast variables are not mutable. But you can always create a new broadcast variable when you want and use the latest broadcast variable in your computation. dstream.transform { rdd = val latestBroacast = getLatestBroadcastVariable() // fetch existing or update+create new and return val transformedRDD = rdd. .. // use latestBroacast in RDD tranformations transformedRDD } Since the transform RDD-to-RDD function runs on the driver every batch interval, it will always use the latest broadcast variable that you want. Though note that whenever you create a new broadcast, the next batch may take a little longer to as the data needs to be actually broadcasted out. That can also be made asynchronous by running a simple task (to force the broadcasting out) on any new broadcast variable in a different thread as Spark Streaming batch schedule, but using the same underlying Spark Context. 1. Is there any kind of sticky partition possible, so that I route my stream data to go through the same node where I've the corresponding entities, subset of entire store, cached in memory within JVM / off heap on the node, this would avoid lookups from store. You could use updateStateByKey. That is quite sticky, but does not eliminate the possibility that it can run on a different node. In fact this is necessary for fault-tolerance - what if the node it was supposed to run goes down? The task will be run on a different node, and you have to design your application such that it can handle that. 1. If I stream the entities from persistent store into engine, this becomes 4th stream - the entity stream, how do i use join / merge to enable stream 1,2,3 to lookup and update the data from stream 4. Would DStream.join work for few seconds worth of data in attribute streams with all data in entity stream ? Or do I use transform and within that use rdd join, I’ve doubts if I am leaning towards core spark approach in spark streaming ? Depends on what kind of join! If you want the join every batch in stream with a static data set (or rarely updated dataset), the transform+join is the way to go. If you want to join one stream with a window of data from another stream, then DStream.join is the way to go. 1. 1. The last approach, which i think will surely work but i want to avoid, is i keep the entities in IMDB and do lookup/update calls on from stream 1,2 and 3. Any help is deeply appreciated as this would help me design my system efficiently and the solution approach may become a
Re: Which Hive version should be used for Spark 1.3
Most likely you have an existing Hive installation with data in it. In this case i was not able to get Spark 1.3 communicate with existing Hive meta store. Hence when i read any table created in hive, Spark SQL used to complain Data table not found If you get it working, please share the steps. On Thu, Apr 9, 2015 at 9:25 PM, Arthur Chan arthur.hk.c...@gmail.com wrote: Hi, I use Hive 0.12 for Spark 1.2 at the moment and plan to upgrade to Spark 1.3.x Could anyone advise which Hive version should be used to match Spark 1.3.x? Can I use Hive 1.1.0 for Spark 1.3? or can I use Hive 0.14 for Spark 1.3? Regards Arthur -- Deepak
Re: Could not compute split, block not found in Spark Streaming Simple Application
Are you running # of receivers = # machines? TD On Thu, Apr 9, 2015 at 9:56 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Sorry, I was getting those errors because my workload was not sustainable. However, I noticed that, by just running the spark-streaming-benchmark ( https://github.com/tdas/spark-streaming-benchmark/blob/master/Benchmark.scala ), I get no difference on the execution time, number of processed records, and delay whether I'm using 1 machine or 2 machines with the setup described before (using spark standalone). Is it normal? On Fri, Mar 27, 2015 at 5:32 PM, Tathagata Das t...@databricks.com wrote: If it is deterministically reproducible, could you generate full DEBUG level logs, from the driver and the workers and give it to me? Basically I want to trace through what is happening to the block that is not being found. And can you tell what Cluster manager are you using? Spark Standalone, Mesos or YARN? On Fri, Mar 27, 2015 at 10:09 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I am just running this simple example with machineA: 1 master + 1 worker machineB: 1 worker « val ssc = new StreamingContext(sparkConf, Duration(1000)) val rawStreams = (1 to numStreams).map(_ =ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER)).toArray val union = ssc.union(rawStreams) union.filter(line = Random.nextInt(1) == 0).map(line = { var sum = BigInt(0) line.toCharArray.foreach(chr = sum += chr.toInt) fib2(sum) sum }).reduceByWindow(_+_, Seconds(1),Seconds(1)).map(s = s### result: $s).print() » And I'm getting the following exceptions: Log from machineB « 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 132 15/03/27 16:21:35 INFO Executor: Running task 0.0 in stage 27.0 (TID 132) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 134 15/03/27 16:21:35 INFO Executor: Running task 2.0 in stage 27.0 (TID 134) 15/03/27 16:21:35 INFO TorrentBroadcast: Started reading broadcast variable 24 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 136 15/03/27 16:21:35 INFO Executor: Running task 4.0 in stage 27.0 (TID 136) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 138 15/03/27 16:21:35 INFO Executor: Running task 6.0 in stage 27.0 (TID 138) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 140 15/03/27 16:21:35 INFO Executor: Running task 8.0 in stage 27.0 (TID 140) 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(1886) called with curMem=47117, maxMem=280248975 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24_piece0 stored as bytes in memory (estimated size 1886.0 B, free 267.2 MB) 15/03/27 16:21:35 INFO BlockManagerMaster: Updated info of block broadcast_24_piece0 15/03/27 16:21:35 INFO TorrentBroadcast: Reading broadcast variable 24 took 19 ms 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(3104) called with curMem=49003, maxMem=280248975 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 3.0 KB, free 267.2 MB) 15/03/27 16:21:35 ERROR Executor: Exception in task 8.0 in stage 27.0 (TID 140) java.lang.Exception: Could not compute split, block input-0-1427473262420 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:701) 15/03/27 16:21:35 ERROR Executor: Exception in task 6.0 in stage 27.0 (TID 138) java.lang.Exception: Could not compute split, block input-0-1427473262418 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at
Re: Overlapping classes warnings
I agree, but as I say, most are out of the control of Spark. They aren't because of unnecessary dependencies. On Thu, Apr 9, 2015 at 5:14 PM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Though the warnings can be ignored, they add up in the log files while compiling other projects too. And there are a lot of those warnings. Any workaround? How do we modify the pom.xml file to exclude these unnecessary dependencies? On Fri, Apr 10, 2015 at 2:29 AM, Sean Owen so...@cloudera.com wrote: Generally, you can ignore these things. They mean some artifacts packaged other artifacts, and so two copies show up when all the JAR contents are merged. But here you do show a small dependency convergence problem; beanutils 1.7 is present but beanutills-core 1.8 is too even though these should be harmonized. I imagine one could be excluded; I imagine we could harmonize the version manually. In practice, I also imagine it doesn't cause any problem but feel free to propose a fix along those lines. On Thu, Apr 9, 2015 at 4:54 PM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Hi, During compilation I get a lot of these: [WARNING] kryo-2.21.jar, reflectasm-1.07-shaded.jar define 23 overlappping classes: [WARNING] commons-beanutils-1.7.0.jar, commons-beanutils-core-1.8.0.jar define 82 overlappping classes: [WARNING] commons-beanutils-1.7.0.jar, commons-collections-3.2.1.jar, commons-beanutils-core-1.8.0.jar define 10 overlappping classes: And a lot of others. How do I fix these? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Overlapping classes warnings
I found this jira https://jira.codehaus.org/browse/MSHADE-128 when googling for fixes. Wonder if it can fix anything here. But anyways, thanks for the help :) On Fri, Apr 10, 2015 at 2:46 AM, Sean Owen so...@cloudera.com wrote: I agree, but as I say, most are out of the control of Spark. They aren't because of unnecessary dependencies. On Thu, Apr 9, 2015 at 5:14 PM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Though the warnings can be ignored, they add up in the log files while compiling other projects too. And there are a lot of those warnings. Any workaround? How do we modify the pom.xml file to exclude these unnecessary dependencies? On Fri, Apr 10, 2015 at 2:29 AM, Sean Owen so...@cloudera.com wrote: Generally, you can ignore these things. They mean some artifacts packaged other artifacts, and so two copies show up when all the JAR contents are merged. But here you do show a small dependency convergence problem; beanutils 1.7 is present but beanutills-core 1.8 is too even though these should be harmonized. I imagine one could be excluded; I imagine we could harmonize the version manually. In practice, I also imagine it doesn't cause any problem but feel free to propose a fix along those lines. On Thu, Apr 9, 2015 at 4:54 PM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Hi, During compilation I get a lot of these: [WARNING] kryo-2.21.jar, reflectasm-1.07-shaded.jar define 23 overlappping classes: [WARNING] commons-beanutils-1.7.0.jar, commons-beanutils-core-1.8.0.jar define 82 overlappping classes: [WARNING] commons-beanutils-1.7.0.jar, commons-collections-3.2.1.jar, commons-beanutils-core-1.8.0.jar define 10 overlappping classes: And a lot of others. How do I fix these?
Re: Jobs failing with KryoException (BufferOverflow)
Please take a look at https://code.google.com/p/kryo/source/browse/trunk/src/com/esotericsoftware/kryo/io/Output.java?r=236 , starting line 27. In Spark, you can control the maxBufferSize with spark.kryoserializer.buffer.max.mb Cheers
Spark Streaming scenarios
Hi, I have following scenario.. need some help ASAP 1. Ad hoc query on spark streaming. How can i run spark queries on ongoing streaming context. Scenario: If a stream job running to find out min and max value in last 5 min(which i am able to do.) Now i want to run interactive query to find min max in last 30 min on this stream. What i was thinking to store the streaming RDD as schemaRDD and do query on that.Is there any better approach?? Where should i store schemaRDD for near real time performance?? 2. Saving and loading intermediate RDDs in cache/disk. What is the best approach to do this. In case any worker fails , whether new worker will resume task,load this saved RDDs?? 3. Write ahead log and Check point. How are the significance of WAL, and checkpoint?? In case of checkpoint if any worker fails will other worker load checkpoint detail and resume its job?? What scenarios i should use WAL and Checkpoint. 4. Spawning multiple processes within spark streaming. Doing multiple operations on same stream. 5. Accessing cached data between spark components. Can cached data in spark streaming is accessible to spark sql?? Can it be shared between these component? or can it be between to sparkcontext? If yes how? if not any alternative approach? 6. Dynamic look up data in spark streaming. I have a scenario where on a stream i want to do some filtering using dynamic lookup data. How can i achieve this scenario? In case i get this lookup data as another stream, and cache it..will it possible to updata/merge this data in cache in 24/7? What is the best approach to do this. I refered Twitter streaming example in spark where it reads a spamfile. but this file is not dynamic in nature.
Re: Join on Spark too slow.
If your data has special characteristics like one small other large then you can think of doing map side join in Spark using (Broadcast Values), this will speed up things. Otherwise as Pitel mentioned if there is nothing special and its just cartesian product it might take ever, or you might increase # of executors. On Thu, Apr 9, 2015 at 8:37 PM, Guillaume Pitel guillaume.pi...@exensa.com wrote: Maybe I'm wrong, but what you are doing here is basically a bunch of cartesian product for each key. So if hello appear 100 times in your corpus, it will produce 100*100 elements in the join output. I don't understand what you're doing here, but it's normal your join takes forever, it makes no sense as it, IMO. Guillaume Hello guys, I am trying to run the following dummy example for Spark, on a dataset of 250MB, using 5 machines with 10GB RAM each, but the join seems to be taking too long ( 2hrs). I am using Spark 0.8.0 but I have also tried the same example on more recent versions, with the same results. Do you have any idea why this is happening? Thanks a lot, Kostas *val *sc = *new *SparkContext( args(0), *DummyJoin*, System.*getenv*(*SPARK_HOME*), *Seq*(System.*getenv*(*SPARK_EXAMPLES_JAR*))) *val *file = sc.textFile(args(1)) *val *wordTuples = file .flatMap(line = line.split(args(2))) .map(word = (word, 1)) *val *big = wordTuples.filter { *case *((k, v)) = k != *a *}.cache() *val *small = wordTuples.filter { *case *((k, v)) = k != *a * k != *to * k != *and *}.cache() *val *res = big.leftOuterJoin(small) res.saveAsTextFile(args(3)) } -- [image: eXenSa] *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- Deepak
Spark Job #of attempts ?
Hello, I have a spark job with 5 stages. After it runs 3rd stage, the console shows 15/04/09 10:25:57 INFO yarn.Client: Application report for application_1427705526386_127168 (state: RUNNING) 15/04/09 10:25:58 INFO yarn.Client: Application report for application_1427705526386_127168 (state: RUNNING) 15/04/09 10:25:59 INFO yarn.Client: Application report for application_1427705526386_127168 (state: ACCEPTED) 15/04/09 10:25:59 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: hdmi-express start time: 1428598679223 final status: UNDEFINED tracking URL: https://apollo-phx-rm-1.vip.ebay.com:50030/proxy/application_1427705526386_127168/ user: dvasthimal 15/04/09 10:26:00 INFO yarn.Client: Application report for application_1427705526386_127168 (state: ACCEPTED) 15/04/09 10:26:01 INFO yarn.Client: Application report for application_1427705526386_127168 (state: ACCEPTED) and then running again. This looks as if the stage failed and Spark restarted the job from beginning. If thats not the case, when i click the spark UI web page, it does not show already completed stages and instead goes back to running stage #1. Is there some setting to turn this behavior off ? -- Deepak
Re: Caching and Actions
You can use toDebugString to see all the steps in job. Best Bojan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Caching-and-Actions-tp22418p22433.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Continuous WARN messages from BlockManager about block replication
Well, you are running in local mode, so it cannot find another peer to replicate the blocks received from receivers. That's it. Its not a real concern and that error will go away when you are run it in a cluster. On Thu, Apr 9, 2015 at 11:24 AM, Nandan Tammineedi nan...@defend7.com wrote: Hi, I'm running a spark streaming job in local mode (--master local[4]), and I'm seeing tons of these messages, roughly once every second - WARN BlockManager: Block input-0-1428527584600 replicated to only 0 peer(s) instead of 1 peers We're using spark 1.2.1. Even with TRACE logging enabled, we're not seeing any log messages indicating failure to replicate the blocks. Should we be concerned about this warning (and if so, how should we debug this), or is this a corner case in local mode where replication is not attempted, but the warning is emitted anyway? If so, what is the workaround? thanks Nandan
Re: [GraphX] aggregateMessages with active set
Actually, GraphX doesn't need to scan all the edges, because it maintains a clustered index on the source vertex id (that is, it sorts the edges by source vertex id and stores the offsets in a hash table). If the activeDirection is appropriately set, it can then jump only to the clusters with active source vertices. See the EdgePartition#index field [1], which stores the offsets, and the logic in GraphImpl#aggregateMessagesWithActiveSet [2], which decides whether to do a full scan or use the index. [1] https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala#L60 [2]. https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L237-266 Ankur On Thu, Apr 9, 2015 at 3:21 AM, James alcaid1...@gmail.com wrote: In aggregateMessagesWithActiveSet, Spark still have to read all edges. It means that a fixed time which scale with graph size is unavoidable on a pregel-like iteration. But what if I have to iterate nearly 100 iterations but at the last 50 iterations there are only 0.1% nodes need to be updated ? The fixed time make the program finished at a unacceptable time consumption. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
make two rdd co-partitioned in python
In scala, we can make two Rdd using the same partitioner so that they are co-partitioned val partitioner = new HashPartitioner(5) val a1 = a.partitionBy(partitioner).cache() val b1 = b.partiitonBy(partitioner).cache() How can we achieve the same in python? It would be great if somebody can share some examples. Thanks, Xiang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/make-two-rdd-co-partitioned-in-python-tp22445.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: External JARs not loading Spark Shell Scala 2.11
You are right this needs to be done. I can work on it soon, I was not sure if there is any one even using scala 2.11 spark repl. Actually there is a patch in scala 2.10 shell to support adding jars (Lost the JIRA ID), which has to be ported for scala 2.11 too. If however, you(or anyone else) are planning to work, I can help you ? Prashant Sharma On Thu, Apr 9, 2015 at 3:08 PM, anakos ana...@gmail.com wrote: Hi- I am having difficulty getting the 1.3.0 Spark shell to find an external jar. I have build Spark locally for Scala 2.11 and I am starting the REPL as follows: bin/spark-shell --master yarn --jars data-api-es-data-export-4.0.0.jar I see the following line in the console output: 15/04/09 09:52:15 INFO spark.SparkContext: Added JAR file:/opt/spark/spark-1.3.0_2.11-hadoop2.3/data-api-es-data-export-4.0.0.jar at http://192.168.115.31:54421/jars/data-api-es-data-export-4.0.0.jar with timestamp 1428569535904 but when i try to import anything from this jar, it's simply not available. When I try to add the jar manually using the command :cp /path/to/jar the classes in the jar are still unavailable. I understand that 2.11 is not officially supported, but has anyone been able to get an external jar loaded in the 1.3.0 release? Is this a known issue? I have tried searching around for answers but the only thing I've found that may be related is this: https://issues.apache.org/jira/browse/SPARK-3257 Any/all help is much appreciated. Thanks Alex -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/External-JARs-not-loading-Spark-Shell-Scala-2-11-tp22434.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unit testing with HiveContext
Thanks Ted, using HiveTest as my context worked. It still left a metastore directory and Derby log in my current working directory though; I manually added a shutdown hook to delete them and all was well. On Wed, Apr 8, 2015 at 4:33 PM, Ted Yu yuzhih...@gmail.com wrote: Please take a look at sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala : protected def configure(): Unit = { warehousePath.delete() metastorePath.delete() setConf(javax.jdo.option.ConnectionURL, sjdbc:derby:;databaseName=$metastorePath;create=true) setConf(hive.metastore.warehouse.dir, warehousePath.toString) } Cheers On Wed, Apr 8, 2015 at 1:07 PM, Daniel Siegmann daniel.siegm...@teamaol.com wrote: I am trying to unit test some code which takes an existing HiveContext and uses it to execute a CREATE TABLE query (among other things). Unfortunately I've run into some hurdles trying to unit test this, and I'm wondering if anyone has a good approach. The metastore DB is automatically created in the local directory, but it doesn't seem to be cleaned up afterward. Is there any way to get Spark to clean this up when the context is stopped? Or can I point this to some other location, such as a temp directory? Trying to create a table fails because it is using the default warehouse directory (/user/hive/warehouse). Is there some way to change this without hard-coding a directory in a hive-site.xml; again, I'd prefer to point it to a temp directory so it will be automatically removed. I tried a couple of things that didn't work: - hiveContext.sql(SET hive.metastore.warehouse.dir=/tmp/dir/xyz) - hiveContext.setConf(hive.metastore.warehouse.dir, /tmp/dir/xyz) Any advice from those who have been here before would be appreciated.