Re: method newAPIHadoopFile
I tried val pairVarOriRDD = sc.newAPIHadoopFile(path, classOf[NetCDFFileInputFormat].asSubclass( classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[WRFIndex,WRFVariable]]), classOf[WRFIndex], classOf[WRFVariable], jobConf) The compiler does not complain. Please let me know if this solution is not good enough. Patcharee On 25. feb. 2015 10:57, Sean Owen wrote: OK, from the declaration you sent me separately: public class NetCDFFileInputFormat extends ArrayBasedFileInputFormat public abstract class ArrayBasedFileInputFormat extends org.apache.hadoop.mapreduce.lib.input.FileInputFormat It looks like you do not declare any generic types that FileInputFormat declares for the key and value type. I think you can get away with this in the Java API with warnings, but scalac is correct that you have not given an InputFormat that matches the bounds required by the API. That is you need to extend something like ArrayBasedFileInputFormat WRFIndex ,WRFVariable On Wed, Feb 25, 2015 at 9:15 AM, patcharee patcharee.thong...@uni.no wrote: Hi, I am new to spark and scala. I have a custom inputformat (used before with mapreduce) and I am trying to use it in spark. In java api (the syntax is correct): JavaPairRDDWRFIndex, WRFVariable pairVarOriRDD = sc.newAPIHadoopFile( path, NetCDFFileInputFormat.class, WRFIndex.class, WRFVariable.class, jobConf); But in scala: val pairVarOriRDD = sc.newAPIHadoopFile(path, classOf[NetCDFFileInputFormat], classOf[WRFIndex], classOf[WRFVariable], jobConf) The compiler complained inferred type arguments [no.uni.computing.io.WRFIndex,no.uni.computing.io.WRFVariable,no.uni.computing.io.input.NetCDFFileInputFormat] do not conform to method newAPIHadoopFile's type parameter bounds [K,V,F : org.apache.hadoop.mapreduce.InputFormat[K,V]] What is the correct syntax for scala api? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running multiple threads with same Spark Context
I am not sure if your issue is setting the Fair mode correctly or something else so let's start with the FAIR mode. Do you see scheduler mode actually being set to FAIR: I have this line in spark-defaults.conf spark.scheduler.allocation.file=/spark/conf/fairscheduler.xml Then, when I start my application, I can see that it is using that scheduler in the UI -- go to master UI and click on your application. Then you should see this (note the scheduling mode is shown as Fair): On Wed, Feb 25, 2015 at 4:06 AM, Harika Matha matha.har...@gmail.com wrote: Hi Yana, I tried running the program after setting the property spark.scheduler.mode to FAIR. But the result is same as previous. Are there any other properties that have to be set? On Tue, Feb 24, 2015 at 10:26 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: It's hard to tell. I have not run this on EC2 but this worked for me: The only thing that I can think of is that the scheduling mode is set to - *Scheduling Mode:* FAIR val pool: ExecutorService = Executors.newFixedThreadPool(poolSize) while_loop to get curr_job pool.execute(new ReportJob(sqlContext, curr_job, i)) class ReportJob(sqlContext:org.apache.spark.sql.hive.HiveContext,query: String,id:Int) extends Runnable with Logging { def threadId = (Thread.currentThread.getName() + \t) def run() { logInfo(s* Running ${threadId} ${id}) val startTime = Platform.currentTime val hiveQuery=query val result_set = sqlContext.sql(hiveQuery) result_set.repartition(1) result_set.saveAsParquetFile(shdfs:///tmp/${id}) logInfo(s* DONE ${threadId} ${id} time: +(Platform.currentTime-startTime)) } } On Tue, Feb 24, 2015 at 4:04 AM, Harika matha.har...@gmail.com wrote: Hi all, I have been running a simple SQL program on Spark. To test the concurrency, I have created 10 threads inside the program, all threads using same SQLContext object. When I ran the program on my EC2 cluster using spark-submit, only 3 threads were running in parallel. I have repeated the test on different EC2 clusters (containing different number of cores) and found out that only 3 threads are running in parallel on every cluster. Why is this behaviour seen? What does this number 3 specify? Is there any configuration parameter that I have to set if I want to run more threads concurrently? Thanks Harika -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-multiple-threads-with-same-Spark-Context-tp21784.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark NullPointerException
Hi all, I am trying to run a Spark Java application on EMR, but I keep getting NullPointerException from the Application master (spark version on EMR: 1.2). The stacktrace is below. I also tried to run the application on Hortonworks Sandbox (2.2) with spark 1.2, following the blogpost (http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/) from Hortonworks, but that failed too. Same exception. I try to run over YARN (master: yarn-cluster). Tried to run the hortonworks sample application on the virtual machine, but that failed with the very same exception. I also tried to set spark home in SparkConf, same exception. What am I missing? The stacktrace and the log: 15/02/25 11:38:41 INFO SecurityManager: Changing view acls to: root 15/02/25 11:38:41 INFO SecurityManager: Changing modify acls to: root 15/02/25 11:38:41 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/02/25 11:38:42 INFO Slf4jLogger: Slf4jLogger started 15/02/25 11:38:42 INFO Remoting: Starting remoting 15/02/25 11:38:42 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@sandbox.hortonworks.com:53937] 15/02/25 11:38:42 INFO Utils: Successfully started service 'sparkDriver' on port 53937. 15/02/25 11:38:42 INFO SparkEnv: Registering MapOutputTracker 15/02/25 11:38:42 INFO SparkEnv: Registering BlockManagerMaster 15/02/25 11:38:42 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150225113842-788f 15/02/25 11:38:42 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/02/25 11:38:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/25 11:38:42 INFO HttpFileServer: HTTP File server directory is /tmp/spark-973069b3-aafd-4f1d-b18c-9e0a5d0efcaa 15/02/25 11:38:42 INFO HttpServer: Starting HTTP Server 15/02/25 11:38:43 INFO Utils: Successfully started service 'HTTP file server' on port 39199. 15/02/25 11:38:43 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/02/25 11:38:43 INFO SparkUI: Started SparkUI at http://sandbox.hortonworks.com:4040 15/02/25 11:38:43 INFO SparkContext: Added JAR file:/root/logprocessor-1.0-SNAPSHOT-jar-with-dependencies.jar at http://192.168.100.37:39199/jars/logprocessor-1.0-SNAPSHOT-jar-with-dependencies.jar with timestamp 1424864323482 15/02/25 11:38:43 INFO YarnClusterScheduler: Created YarnClusterScheduler Exception in thread main java.lang.NullPointerException at org.apache.spark.deploy.yarn.ApplicationMaster$.getAttempId(ApplicationMaster.scala:524) at org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend.start(YarnClusterSchedulerBackend.scala:34) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.init(SparkContext.scala:337) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:75) at hu.enbritely.logprocessor.Logprocessor.main(Logprocessor.java:43) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:76) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties One of the program I try to run: public static void main(String[] argv) { SparkConf conf = new SparkConf(); JavaSparkContext spark = new JavaSparkContext(yarn-cluster, Spark logprocessing, conf); JavaRDDString file = spark.textFile(hdfs://spark-output); file.saveAsTextFile(hdfs://output); spark.stop(); } Thank you for your assistance! Mate Gulyas - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Brodcast Variable updated from one transformation and used from another
What I think is happening that the map operations are executed concurrently and the map operation in rdd2 has the initial copy of myObjectBroadcated. Is there a way to apply the transformations sequentially? First materialize rdd1 and then rdd2. Thanks a lot! On 24 February 2015 at 18:49, Yiannis Gkoufas johngou...@gmail.com wrote: Sorry for the mistake, I actually have it this way: val myObject = new MyObject(); val myObjectBroadcasted = sc.broadcast(myObject); val rdd1 = sc.textFile(/file1).map(e = { myObjectBroadcasted.value.insert(e._1); (e._1,1) }); rdd.cache.count(); //to make sure it is transformed. val rdd2 = sc.textFile(/file2).map(e = { val lookedUp = myObjectBroadcasted.value.lookup(e._1); (e._1, lookedUp) }); On 24 February 2015 at 17:36, Ganelin, Ilya ilya.gane...@capitalone.com wrote: You're not using the broadcasted variable within your map operations. You're attempting to modify myObjrct directly which won't work because you are modifying the serialized copy on the executor. You want to do myObjectBroadcasted.value.insert and myObjectBroadcasted.value.lookup. Sent with Good (www.good.com) -Original Message- *From: *Yiannis Gkoufas [johngou...@gmail.com] *Sent: *Tuesday, February 24, 2015 12:12 PM Eastern Standard Time *To: *user@spark.apache.org *Subject: *Brodcast Variable updated from one transformation and used from another Hi all, I am trying to do the following. val myObject = new MyObject(); val myObjectBroadcasted = sc.broadcast(myObject); val rdd1 = sc.textFile(/file1).map(e = { myObject.insert(e._1); (e._1,1) }); rdd.cache.count(); //to make sure it is transformed. val rdd2 = sc.textFile(/file2).map(e = { val lookedUp = myObject.lookup(e._1); (e._1, lookedUp) }); When I check the contents of myObject within the map of rdd1 everything seems ok. On the other hand when I check the contents of myObject within the map of rdd2 it seems to be empty. I am doing something wrong? Thanks a lot! -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: SparkStreaming failing with exception Could not compute split, block input
My application runs fine for ~3/4 hours and then hits this issue. On Wed, Feb 25, 2015 at 11:34 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hi Experts, My Spark Job is failing with below error. From the logs I can see that input-3-1424842351600 was added at 5:32:32 and was never purged out of memory. Also the available free memory for the executor is *2.1G*. Please help me figure out why executors cannot fetch this input. Txz for any help, Cheers. *Logs* 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added input-3-1424842351600 in memory on chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB) . . 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919 (size: 232.3 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751 (size: 291.4 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842351600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842355600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Number of Executors per worker process
Hello! I've read the documentation about the spark architecture, I have the following questions: 1: how many executors can be on a single worker process (JMV)? 2:Should I think executor like a Java Thread Executor where the pool size is equal with the number of the given cores (set up by the SPARK_WORKER_CORES)? 3. If the worker can have many executors, how this is handled by the Spark? Or can I handle by myself to set up the number of executors per JVM? I look forward for your answers. Regards, Florin
Re: throughput in the web console?
Let me ask like this, what would be the easiest way to display the throughput in the web console? Would I need to create a new tab and add the metrics? Any good or simple examples showing how this can be done? On Wed, Feb 25, 2015 at 12:07 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you have a look at https://spark.apache.org/docs/1.0.2/api/scala/index.html#org.apache.spark.scheduler.SparkListener And for Streaming: https://spark.apache.org/docs/1.0.2/api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener Thanks Best Regards On Tue, Feb 24, 2015 at 10:29 PM, Josh J joshjd...@gmail.com wrote: Hi, I plan to run a parameter search varying the number of cores, epoch, and parallelism. The web console provides a way to archive the previous runs, though is there a way to view in the console the throughput? Rather than logging the throughput separately to the log files and correlating the logs files to the web console processing times? Thanks, Josh
Re: throughput in the web console?
For SparkStreaming applications, there is already a tab called Streaming which displays the basic statistics. Thanks Best Regards On Wed, Feb 25, 2015 at 8:55 PM, Josh J joshjd...@gmail.com wrote: Let me ask like this, what would be the easiest way to display the throughput in the web console? Would I need to create a new tab and add the metrics? Any good or simple examples showing how this can be done? On Wed, Feb 25, 2015 at 12:07 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you have a look at https://spark.apache.org/docs/1.0.2/api/scala/index.html#org.apache.spark.scheduler.SparkListener And for Streaming: https://spark.apache.org/docs/1.0.2/api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener Thanks Best Regards On Tue, Feb 24, 2015 at 10:29 PM, Josh J joshjd...@gmail.com wrote: Hi, I plan to run a parameter search varying the number of cores, epoch, and parallelism. The web console provides a way to archive the previous runs, though is there a way to view in the console the throughput? Rather than logging the throughput separately to the log files and correlating the logs files to the web console processing times? Thanks, Josh
Re: throughput in the web console?
On Wed, Feb 25, 2015 at 7:54 AM, Akhil Das ak...@sigmoidanalytics.com wrote: For SparkStreaming applications, there is already a tab called Streaming which displays the basic statistics. Would I just need to extend this tab to add the throughput?
Number of parallel tasks
I have Spark running in standalone mode with 4 executors, and each executor with 5 cores each (spark.executor.cores=5). However, when I'm processing an RDD with ~90,000 partitions, I only get 4 parallel tasks. Shouldn't I be getting 4x5=20 parallel task executions?
Re: Brodcast Variable updated from one transformation and used from another
Hi Yiannis, Broadcast variables are meant for *immutable* data. They are not meant for data structures that you intend to update. (It might *happen* to work when running local mode, though I doubt it, and it would probably be a bug if it did. It will certainly not work when running on a cluster.) This probably seems like a huge restriction, but its really fundamental to spark's execution model. B/c they are immutable, spark can make optimizations around when how the broadcast variable is shared. Furthermore, its very important for having clearly defined semantics. Eg., imagine that your broadcast variable was a hashmap. What would the eventual result be if task 1 updated key X to have value A, but task 2 updated key X to have value B? How should the updates from each task be combined together? You have a few alternatives. It really depends a lot on your use case which one is right, their are a lot of factors to consider. 1) put your updates in another RDD, collect() it, update your variable on the driver, rebroadcast it. (least scalable) 2) use an accumulator to get the updates from each stage. (maybe a bit more efficient, b) 3) use some completely different mechanism for storing the data in your broadcast var. Eg., use a distributed key-value store. Or put the data in another RDD, which you join against your data. (most scalable, but may not be applicable at all.) which one is right depends a lot on what you are trying to do. Imran On Wed, Feb 25, 2015 at 8:02 AM, Yiannis Gkoufas johngou...@gmail.com wrote: What I think is happening that the map operations are executed concurrently and the map operation in rdd2 has the initial copy of myObjectBroadcated. Is there a way to apply the transformations sequentially? First materialize rdd1 and then rdd2. Thanks a lot! On 24 February 2015 at 18:49, Yiannis Gkoufas johngou...@gmail.com wrote: Sorry for the mistake, I actually have it this way: val myObject = new MyObject(); val myObjectBroadcasted = sc.broadcast(myObject); val rdd1 = sc.textFile(/file1).map(e = { myObjectBroadcasted.value.insert(e._1); (e._1,1) }); rdd.cache.count(); //to make sure it is transformed. val rdd2 = sc.textFile(/file2).map(e = { val lookedUp = myObjectBroadcasted.value.lookup(e._1); (e._1, lookedUp) }); On 24 February 2015 at 17:36, Ganelin, Ilya ilya.gane...@capitalone.com wrote: You're not using the broadcasted variable within your map operations. You're attempting to modify myObjrct directly which won't work because you are modifying the serialized copy on the executor. You want to do myObjectBroadcasted.value.insert and myObjectBroadcasted.value.lookup. Sent with Good (www.good.com) -Original Message- *From: *Yiannis Gkoufas [johngou...@gmail.com] *Sent: *Tuesday, February 24, 2015 12:12 PM Eastern Standard Time *To: *user@spark.apache.org *Subject: *Brodcast Variable updated from one transformation and used from another Hi all, I am trying to do the following. val myObject = new MyObject(); val myObjectBroadcasted = sc.broadcast(myObject); val rdd1 = sc.textFile(/file1).map(e = { myObject.insert(e._1); (e._1,1) }); rdd.cache.count(); //to make sure it is transformed. val rdd2 = sc.textFile(/file2).map(e = { val lookedUp = myObject.lookup(e._1); (e._1, lookedUp) }); When I check the contents of myObject within the map of rdd1 everything seems ok. On the other hand when I check the contents of myObject within the map of rdd2 it seems to be empty. I am doing something wrong? Thanks a lot! -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Spark Standard Application to Test
Hello, I am preparing some tests to execute in Spark in order to manipulate properties and check the variations in results. For this, I need to use a Standard Application in my environment like the well-known apps to Hadoop: Terasort https://hadoop.apache.org/docs/current/api/org/apache/hadoop/examples/terasort/package-summary.html and specially Terrier http://terrier.org/ or something similar. I do not need applications wordcount and grep because I have used them. Can anyone suggest me something about this? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Standard-Application-to-Test-tp21803.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: throughput in the web console?
By throughput you mean Number of events processed etc? [image: Inline image 1] Streaming tab already have these statistics. Thanks Best Regards On Wed, Feb 25, 2015 at 9:59 PM, Josh J joshjd...@gmail.com wrote: On Wed, Feb 25, 2015 at 7:54 AM, Akhil Das ak...@sigmoidanalytics.com wrote: For SparkStreaming applications, there is already a tab called Streaming which displays the basic statistics. Would I just need to extend this tab to add the throughput?
NegativeArraySizeException when doing joins on skewed data
I have been running into NegativeArraySizeException's when doing joins on data with very skewed key distributions in Spark 1.2.0. I found a previous post that mentioned that this exception arises when the size of the blocks spilled during the shuffle exceeds 2GB. The post recommended increasing the number of partitions. I tried increasing the number of partitions, and using the RangePartitioner instead of the HashPartitioner but still encountered the problem. Does Spark support skewed joins similar to Pig? com.esotericsoftware.kryo.KryoException: java.lang.NegativeArraySizeException Serialization trace: otherElements (org.apache.spark.util.collection.CompactBuffer) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) at org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:176) at org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:63) at org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:87) at org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:63) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NegativeArraySizeException at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) at
Re: Large Similarity Job failing
Is the threshold valid only for tall skinny matrices ? Mine is 6 m x 1.5 m and I made sparsity pattern 100:1.5M..we would like to increase the sparsity pattern to 1000:1.5M I am running 1.1 stable and I get random shuffle failures...may be 1.2 sort shuffle will help.. I read in Reza paper that oversample works only if cols are skinny so I am not very keen to oversample... On Feb 17, 2015 2:01 PM, Xiangrui Meng men...@gmail.com wrote: The complexity of DIMSUM is independent of the number of rows but still have quadratic dependency on the number of columns. 1.5M columns may be too large to use DIMSUM. Try to increase the threshold and see whether it helps. -Xiangrui On Tue, Feb 17, 2015 at 6:28 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, I am running brute force similarity from RowMatrix on a job with 5M x 1.5M sparse matrix with 800M entries. With 200M entries the job run fine but with 800M I am getting exceptions like too many files open and no space left on device... Seems like I need more nodes or use dimsum sampling ? I am running on 10 nodes where ulimit on each node is set at 65K...Memory is not an issue since I can cache the dataset before similarity computation starts. I tested the same job on YARN with Spark 1.1 and Spark 1.2 stable. Both the jobs failed with FetchFailed msgs. Thanks. Deb
NullPointerException in ApplicationMaster
Hi all, I am trying to run a Spark Java application on EMR, but I keep getting NullPointerException from the Application master (spark version on EMR: 1.2). The stacktrace is below. I also tried to run the application on Hortonworks Sandbox (2.2) with spark 1.2, following the blogpost (http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/) from Hortonworks, but that failed too. Same exception. I try to run over YARN (master: yarn-cluster). Tried to run the hortonworks sample application on the virtual machine, but that failed with the very same exception. I also tried to set spark home in SparkConf, same exception. What am I missing? The stacktrace and the log: 15/02/25 11:38:41 INFO SecurityManager: Changing view acls to: root 15/02/25 11:38:41 INFO SecurityManager: Changing modify acls to: root 15/02/25 11:38:41 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/02/25 11:38:42 INFO Slf4jLogger: Slf4jLogger started 15/02/25 11:38:42 INFO Remoting: Starting remoting 15/02/25 11:38:42 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@sandbox.hortonworks.com:53937] 15/02/25 11:38:42 INFO Utils: Successfully started service 'sparkDriver' on port 53937. 15/02/25 11:38:42 INFO SparkEnv: Registering MapOutputTracker 15/02/25 11:38:42 INFO SparkEnv: Registering BlockManagerMaster 15/02/25 11:38:42 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150225113842-788f 15/02/25 11:38:42 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/02/25 11:38:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/25 11:38:42 INFO HttpFileServer: HTTP File server directory is /tmp/spark-973069b3-aafd-4f1d-b18c-9e0a5d0efcaa 15/02/25 11:38:42 INFO HttpServer: Starting HTTP Server 15/02/25 11:38:43 INFO Utils: Successfully started service 'HTTP file server' on port 39199. 15/02/25 11:38:43 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/02/25 11:38:43 INFO SparkUI: Started SparkUI at http://sandbox.hortonworks.com:4040 15/02/25 11:38:43 INFO SparkContext: Added JAR file:/root/logprocessor-1.0-SNAPSHOT-jar-with-dependencies.jar at http://192.168.100.37:39199/jars/logprocessor-1.0-SNAPSHOT-jar-with-dependencies.jar with timestamp 1424864323482 15/02/25 11:38:43 INFO YarnClusterScheduler: Created YarnClusterScheduler Exception in thread main java.lang.NullPointerException at org.apache.spark.deploy.yarn.ApplicationMaster$.getAttempId(ApplicationMaster.scala:524) at org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend.start(YarnClusterSchedulerBackend.scala:34) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.init(SparkContext.scala:337) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:75) at hu.enbritely.logprocessor.Logprocessor.main(Logprocessor.java:43) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:76) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties One of the program I try to run: public static void main(String[] argv) { SparkConf conf = new SparkConf(); JavaSparkContext spark = new JavaSparkContext(yarn-cluster, Spark logprocessing, conf); JavaRDDString file = spark.textFile(hdfs://spark-output); file.saveAsTextFile(hdfs://output); spark.stop(); } Thank you for your assistance! Mate Gulyas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-in-ApplicationMaster-tp21804.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help vote for Spark talks at the Hadoop Summit
Hi all Here is another Spark talk (a vendor-independent one!) that you might have missed: 'The Future of Apache Hadoop' track: How Spark and Flink are shaping the future of Hadoop? https://hadoopsummit.uservoice.com/forums/283266-the-future-of-apache-hadoop/suggestions/7074410 Regards, Slim Baltagi http://www.SparkBigData.com On Feb 25, 2015, at 12:33 PM, Xiangrui Meng men...@gmail.com wrote: Made 3 votes to each of the talks. Looking forward to see them in Hadoop Summit:) -Xiangrui On Tue, Feb 24, 2015 at 9:54 PM, Reynold Xin r...@databricks.com wrote: Hi all, The Hadoop Summit uses community choice voting to decide which talks to feature. It would be great if the community could help vote for Spark talks so that Spark has a good showing at this event. You can make three votes on each track. Below I've listed 3 talks that are important to Spark's roadmap. Please give 3 votes to each of the following talks. Committer Track: Lessons from Running Ultra Large Scale Spark Workloads on Hadoop https://hadoopsummit.uservoice.com/forums/283260-committer-track/suggestions/7074016 Data Science track: DataFrames: large-scale data science on Hadoop data with Spark https://hadoopsummit.uservoice.com/forums/283261-data-science-and-hadoop/suggestions/7074147 Future of Hadoop track: Online Approximate OLAP in SparkSQL https://hadoopsummit.uservoice.com/forums/283266-the-future-of-apache-hadoop/suggestions/7074424 Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-SQL 1.2.0 sort by results are not consistent with Hive
Cheng, We tried this setting and it still did not help. This was on Spark 1.2.0. -- Kannan On Mon, Feb 23, 2015 at 6:38 PM, Cheng Lian lian.cs@gmail.com wrote: (Move to user list.) Hi Kannan, You need to set mapred.map.tasks to 1 in hive-site.xml. The reason is this line of code https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala#L68, which overrides spark.default.parallelism. Also, spark.sql.shuffle.parallelism isn’t used here since there’s no shuffle involved (we only need to sort within a partition). Default value of mapred.map.tasks is 2 https://hadoop.apache.org/docs/r1.0.4/mapred-default.html. You may see that the Spark SQL result can be divided into two sorted parts from the middle. Cheng On 2/19/15 10:33 AM, Kannan Rajah wrote: According to hive documentation, sort by is supposed to order the results for each reducer. So if we set a single reducer, then the results should be sorted, right? But this is not happening. Any idea why? Looks like the settings I am using to restrict the number of reducers is not having an effect. *Tried the following:* Set spark.default.parallelism to 1 Set spark.sql.shuffle.partitions to 1 These were set in hive-site.xml and also inside spark shell. *Spark-SQL* create table if not exists testSortBy (key int, name string, age int); LOAD DATA LOCAL INPATH '/home/mapr/sample-name-age.txt' OVERWRITE INTO TABLE testSortBy; select * from testSortBY; 1Aditya28 2aash25 3prashanth27 4bharath26 5terry27 6nanda26 7pradeep27 8pratyay26 set spark.default.parallelism=1; set spark.sql.shuffle.partitions=1; select name,age from testSortBy sort by age; aash 25 bharath 26 prashanth 27 Aditya 28 nanda 26 pratyay 26 terry 27 pradeep 27 *HIVE* select name,age from testSortBy sort by age; aash25 bharath26 nanda26 pratyay26 prashanth27 terry27 pradeep27 Aditya28 -- Kannan
Re: Help vote for Spark talks at the Hadoop Summit
Made 3 votes to each of the talks. Looking forward to see them in Hadoop Summit:) -Xiangrui On Tue, Feb 24, 2015 at 9:54 PM, Reynold Xin r...@databricks.com wrote: Hi all, The Hadoop Summit uses community choice voting to decide which talks to feature. It would be great if the community could help vote for Spark talks so that Spark has a good showing at this event. You can make three votes on each track. Below I've listed 3 talks that are important to Spark's roadmap. Please give 3 votes to each of the following talks. Committer Track: Lessons from Running Ultra Large Scale Spark Workloads on Hadoop https://hadoopsummit.uservoice.com/forums/283260-committer-track/suggestions/7074016 Data Science track: DataFrames: large-scale data science on Hadoop data with Spark https://hadoopsummit.uservoice.com/forums/283261-data-science-and-hadoop/suggestions/7074147 Future of Hadoop track: Online Approximate OLAP in SparkSQL https://hadoopsummit.uservoice.com/forums/283266-the-future-of-apache-hadoop/suggestions/7074424 Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Hamburg Apache Spark Meetup
Please add the Zagreb Meetup group, too. http://www.meetup.com/Apache-Spark-Zagreb-Meetup/ Thanks! On 18.2.2015. 19:46, Johan Beisser wrote: If you could also add the Hamburg Apache Spark Meetup, I'd appreciate it. http://www.meetup.com/Hamburg-Apache-Spark-Meetup/ On Tue, Feb 17, 2015 at 5:08 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Thanks! I've added you. Matei On Feb 17, 2015, at 4:06 PM, Ralph Bergmann | the4thFloor.eu ra...@the4thfloor.eu wrote: Hi, there is a small Spark Meetup group in Berlin, Germany :-) http://www.meetup.com/Berlin-Apache-Spark-Meetup/ Plaes add this group to the Meetups list at https://spark.apache.org/community.html Ralph - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to run hive queries inside spark
It looks like that is getting interpreted as a local path. Are you missing a core-site.xml file to configure hdfs? On Tue, Feb 24, 2015 at 10:40 PM, kundan kumar iitr.kun...@gmail.com wrote: Hi Denny, yes the user has all the rights to HDFS. I am running all the spark operations with this user. and my hive-site.xml looks like this property namehive.metastore.warehouse.dir/name value/user/hive/warehouse/value descriptionlocation of default database for the warehouse/description /property Do I need to do anything explicitly other than placing hive-site.xml in the spark.conf directory ? Thanks !! On Wed, Feb 25, 2015 at 11:42 AM, Denny Lee denny.g@gmail.com wrote: The error message you have is: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:file:/user/hive/warehouse/src is not a directory or unable to create one) Could you verify that you (the user you are running under) has the rights to create the necessary folders within HDFS? On Tue, Feb 24, 2015 at 9:06 PM kundan kumar iitr.kun...@gmail.com wrote: Hi , I have placed my hive-site.xml inside spark/conf and i am trying to execute some hive queries given in the documentation. Can you please suggest what wrong am I doing here. scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@3340a4b8 scala hiveContext.hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) warning: there were 1 deprecation warning(s); re-run with -deprecation for details 15/02/25 10:30:59 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/02/25 10:30:59 INFO ParseDriver: Parse Completed 15/02/25 10:30:59 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/25 10:30:59 INFO ObjectStore: ObjectStore, initialize called 15/02/25 10:30:59 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/02/25 10:30:59 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/02/25 10:31:08 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/02/25 10:31:08 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . 15/02/25 10:31:09 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:09 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:15 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:15 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:17 INFO ObjectStore: Initialized ObjectStore 15/02/25 10:31:17 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa 15/02/25 10:31:18 INFO HiveMetaStore: Added admin role in metastore 15/02/25 10:31:18 INFO HiveMetaStore: Added public role in metastore 15/02/25 10:31:18 INFO HiveMetaStore: No user is added in admin role, since config is empty 15/02/25 10:31:18 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=TimeToSubmit from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO Driver: Concurrency mode is disabled, not creating a lock manager 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/02/25 10:31:18 INFO ParseDriver: Parse Completed 15/02/25 10:31:18 INFO PerfLogger: /PERFLOG method=parse start=1424840478985 end=1424840478986 duration=1 from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=semanticAnalyze from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:19 INFO SemanticAnalyzer: Starting Semantic
Re: throughput in the web console?
Hi Josh, SPM will show you this info. I see you use Kafka, too, whose numerous metrics you can also see in SPM side by side with your Spark metrics. Sounds like trends is what you are after, so I hope this helps. See http://sematext.com/spm Otis On Feb 24, 2015, at 11:59, Josh J joshjd...@gmail.com wrote: Hi, I plan to run a parameter search varying the number of cores, epoch, and parallelism. The web console provides a way to archive the previous runs, though is there a way to view in the console the throughput? Rather than logging the throughput separately to the log files and correlating the logs files to the web console processing times? Thanks, Josh - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark excludes fastutil dependencies we need
Interesting. Looking at SparkConf.scala : val configs = Seq( DeprecatedConfig(spark.files.userClassPathFirst, spark.executor.userClassPathFirst, 1.3), DeprecatedConfig(spark.yarn.user.classpath.first, null, 1.3, Use spark.{driver,executor}.userClassPathFirst instead.)) It seems spark.files.userClassPathFirst and spark.yarn.user.classpath.first are deprecated. On Wed, Feb 25, 2015 at 12:39 AM, Sean Owen so...@cloudera.com wrote: No, we should not add fastutil back. It's up to the app to bring dependencies it needs, and that's how I understand this issue. The question is really, how to get the classloader visibility right. It depends on where you need these classes. Have you looked into spark.files.userClassPathFirst and spark.yarn.user.classpath.first ? On Wed, Feb 25, 2015 at 5:34 AM, Ted Yu yuzhih...@gmail.com wrote: bq. depend on missing fastutil classes like Long2LongOpenHashMap Looks like Long2LongOpenHashMap should be added to the shaded jar. Cheers On Tue, Feb 24, 2015 at 7:36 PM, Jim Kleckner j...@cloudphysics.com wrote: Spark includes the clearspring analytics package but intentionally excludes the dependencies of the fastutil package (see below). Spark includes parquet-column which includes fastutil and relocates it under parquet/ but creates a shaded jar file which is incomplete because it shades out some of the fastutil classes, notably Long2LongOpenHashMap, which is present in the fastutil jar file that parquet-column is referencing. We are using more of the clearspring classes (e.g. QDigest) and those do depend on missing fastutil classes like Long2LongOpenHashMap. Even though I add them to our assembly jar file, the class loader finds the spark assembly and we get runtime class loader errors when we try to use it. It is possible to put our jar file first, as described here: https://issues.apache.org/jira/browse/SPARK-939 http://spark.apache.org/docs/1.2.0/configuration.html#runtime-environment which I tried with args to spark-submit: --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true but we still get the class not found error. We have tried copying the source code for clearspring into our package and renaming the package and that makes it appear to work... Is this risky? It certainly is ugly. Can anyone recommend a way to deal with this dependency **ll ? === The spark/pom.xml file contains the following lines: dependency groupIdcom.clearspring.analytics/groupId artifactIdstream/artifactId version2.7.0/version exclusions exclusion groupIdit.unimi.dsi/groupId artifactIdfastutil/artifactId /exclusion /exclusions /dependency === The parquet-column/pom.xml file contains: artifactIdmaven-shade-plugin/artifactId executions execution phasepackage/phase goals goalshade/goal /goals configuration minimizeJartrue/minimizeJar artifactSet includes includeit.unimi.dsi:fastutil/include /includes /artifactSet relocations relocation patternit.unimi.dsi/pattern shadedPatternparquet.it.unimi.dsi/shadedPattern /relocation /relocations /configuration /execution /executions -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-excludes-fastutil-dependencies-we-need-tp21794.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Filter data from one RDD based on data from another RDD
Hello Imran, Thanks for your response. I noticed the intersection and subtract methods for a RDD, does they work based on hash off all the fields in a RDD record ? - Himanish On Thu, Feb 19, 2015 at 6:11 PM, Imran Rashid iras...@cloudera.com wrote: the more scalable alternative is to do a join (or a variant like cogroup, leftOuterJoin, subtractByKey etc. found in PairRDDFunctions) the downside is this requires a shuffle of both your RDDs On Thu, Feb 19, 2015 at 3:36 PM, Himanish Kushary himan...@gmail.com wrote: Hi, I have two RDD's with csv data as below : RDD-1 101970_5854301840,fbcf5485-e696-4100-9468-a17ec7c5bb43,19229261643 101970_5854301839,fbaf5485-e696-4100-9468-a17ec7c5bb39,9229261645 101970_5854301839,fbbf5485-e696-4100-9468-a17ec7c5bb39,9229261647 101970_17038953,546853f9-cf07-4700-b202-00f21e7c56d8,791191603 101970_5854301840,fbcf5485-e696-4100-9468-a17ec7c5bb42,19229261643 101970_5851048323,218f5485-e58c-4200-a473-348ddb858578,290542385 101970_5854301839,fbcf5485-e696-4100-9468-a17ec7c5bb41,922926164 RDD-2 101970_17038953,546853f9-cf07-4700-b202-00f21e7c56d9,7911160 101970_5851048323,218f5485-e58c-4200-a473-348ddb858578,2954238 101970_5854301839,fbaf5485-e696-4100-9468-a17ec7c5bb39,9226164 101970_5854301839,fbbf5485-e696-4100-9468-a17ec7c5bb39,92292164 101970_5854301839,fbcf5485-e696-4100-9468-a17ec7c5bb41,9226164 101970_5854301838,fbcf5485-e696-4100-9468-a17ec7c5bb40,929164 101970_5854301838,fbcf5485-e696-4100-9468-a17ec7c5bb39,26164 I need to filter RDD-2 to include only those records where the first column value in RDD-2 matches any of the first column values in RDD-1 Currently , I am broadcasting the first column values from RDD-1 as a list and then filtering RDD-2 based on that list. val rdd1broadcast = sc.broadcast(rdd1.map { uu = uu.split(,)(0) }.collect().toSet) val rdd2filtered = rdd2.filter{ h = rdd1broadcast.value.contains(h.split(,)(0)) } This will result in data with first column 101970_5854301838 (last two records) to be filtered out from RDD-2. Is this is the best way to accomplish this ? I am worried that for large data volume , the broadcast step may become an issue. Appreciate any other suggestion. --- Thanks Himanish -- Thanks Regards Himanish
Re: NullPointerException in ApplicationMaster
Look at the trace again. It is a very weird error. The SparkSubmit is running on client side, but YarnClusterSchedulerBackend is supposed in running in YARN AM. I suspect you are running the cluster with yarn-client mode, but in JavaSparkContext you set yarn-cluster”. As a result, spark context initiate YarnClusterSchedulerBackend instead of YarnClientSchedulerBackend, which I think is the root cause. Thanks. Zhan Zhang On Feb 25, 2015, at 1:53 PM, Zhan Zhang zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote: Hi Mate, When you initialize the JavaSparkContext, you don’t need to specify the mode “yarn-cluster”. I suspect that is the root cause. Thanks. Zhan Zhang On Feb 25, 2015, at 10:12 AM, gulyasm mgulya...@gmail.commailto:mgulya...@gmail.com wrote: JavaSparkContext.
How to pass a org.apache.spark.rdd.RDD in a recursive function
Hello i am trying to pass as a parameter a org.apache.spark.rdd.RDD table to a recursive function. This table should be changed in any step of the recursion and could not be just a global var need help :) Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-a-org-apache-spark-rdd-RDD-in-a-recursive-function-tp21805.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming - Collecting RDDs into array in the driver program
I have this function in the driver program which collects the result from rdds (in a stream) into an array and return. However, even though the RDDs (in the dstream) have data, the function is returning an empty array...What am I doing wrong? I can print the RDD values inside the foreachRDD call but the array is always empty. def runTopFunction() : Array[(String, Int)] = { val topSearches = some function val summary = new ArrayBuffer[(String,Int)]() topSearches.foreachRDD(rdd = { summary = summary.++(rdd.collect()) }) return summary.toArray }
RE: spark sql: join sql fails after sqlCtx.cacheTable()
Using Hivecontext solved it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-join-sql-fails-after-sqlCtx-cacheTable-tp16893p21807.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Error when running the terasort branche in a cluster
Not sure if this is the place to ask, but i am using the terasort branche of Spark for benchmarking, as found on https://github.com/ehiggs/spark/tree/terasort, and I get the error below when running on two machines (one machine works just fine). When looking at the code, listed below the error message, I see while (read TeraInputFormat.RECORD_LEN) { - Is it possible that this restricts the branch from running on a cluster? - Did anybody manage to run this branch on a cluster? Thanks, Tom 15/02/25 17:55:42 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, arlab152): org.apache.hadoop.fs.ChecksumException: Checksum error: file:/home/th/terasort_in/part-r-0 at 4872 exp: 1592400191 got: -1117747586 at org.apache.hadoop.fs.FSInputChecker.verifySums(FSInputChecker.java:322) at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:278) at org.apache.hadoop.fs.FSInputChecker.fill(FSInputChecker.java:213) at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:231) at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:195) at java.io.DataInputStream.read(DataInputStream.java:161) at org.apache.spark.examples.terasort.TeraInputFormat$TeraRecordReader.nextKeyValue(TeraInputFormat.scala:91) Code: override def nextKeyValue() : Boolean = { if (offset = length) { return false } var read : Int = 0 while (read TeraInputFormat.RECORD_LEN) { var newRead : Int = in.read(buffer, read, TeraInputFormat.RECORD_LEN - read) if (newRead == -1) { if (read == 0) false else throw new EOFException(read past eof) } read += newRead } if (key == null) { key = new Array[Byte](TeraInputFormat.KEY_LEN) } if (value == null) { value = new Array[Byte](TeraInputFormat.VALUE_LEN) } buffer.copyToArray(key, 0, TeraInputFormat.KEY_LEN) buffer.takeRight(TeraInputFormat.VALUE_LEN).copyToArray(value, 0, TeraInputFormat.VALUE_LEN) offset += TeraInputFormat.RECORD_LEN true } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-running-the-terasort-branche-in-a-cluster-tp21808.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark standalone with multiple executors in one work node
Hello, Does spark standalone support running multiple executors in one worker node? It seems yarn has the parameter --num-executors to set number of executors to deploy, but I do not find the equivalent parameter in spark standalone. Thanks, Judy
Re: Spark Streaming - Collecting RDDs into array in the driver program
Hi, On Thu, Feb 26, 2015 at 11:24 AM, Thanigai Vellore thanigai.vell...@gmail.com wrote: It appears that the function immediately returns even before the foreachrdd stage is executed. Is that possible? Sure, that's exactly what happens. foreachRDD() schedules a computation, it does not perform it. Maybe your streaming application would not ever terminate, but still the function needs to return, right? If you remove the toArray(), you will return a reference to the ArrayBuffer that will be appended to over time. You can then, in a different thread, check the contents of that ArrayBuffer as processing happens, or wait until processing ends. Tobias
Re: Spark Streaming - Collecting RDDs into array in the driver program
I didn't include the complete driver code but I do run the streaming context from the main program which calls this function. Again, I can print the red elements within the foreachrdd block but the array that is returned is always empty. It appears that the function immediately returns even before the foreachrdd stage is executed. Is that possible? On Feb 25, 2015 5:41 PM, Tathagata Das t...@databricks.com wrote: You are just setting up the computation here using foreacRDD. You have not even run the streaming context to get any data. On Wed, Feb 25, 2015 at 2:21 PM, Thanigai Vellore thanigai.vell...@gmail.com wrote: I have this function in the driver program which collects the result from rdds (in a stream) into an array and return. However, even though the RDDs (in the dstream) have data, the function is returning an empty array...What am I doing wrong? I can print the RDD values inside the foreachRDD call but the array is always empty. def runTopFunction() : Array[(String, Int)] = { val topSearches = some function val summary = new ArrayBuffer[(String,Int)]() topSearches.foreachRDD(rdd = { summary = summary.++(rdd.collect()) }) return summary.toArray }
Re: Large Similarity Job failing
Hi Reza, With 40 nodes and shuffle space managed by YARN over HDFS usercache we could run the similarity job without doing any thresholding...We used hash based shuffle and sort hopefully will further improve it...Note that this job was almost 6M x 1.5M We will go towards 50 M x ~ 3M columns and increase the sparsity pattern...Dimsum configurations will definitely help over there... With a baseline run, it will be easier for me to now run dimsum sampling and compare the results...I will try the configs that you pointed. Thanks. Deb On Wed, Feb 25, 2015 at 3:52 PM, Reza Zadeh r...@databricks.com wrote: Hi Deb, Did you try using higher threshold values as I mentioned in an earlier email? Use RowMatrix.columnSimilarities(x) where x is some number ? Try the following values for x: 0.1, 0.9, 10, 100 And yes, the idea is that the matrix is skinny, you are pushing the boundary with 1.5m columns, because the output can potentially have 2.25 x 10^12 entries, which is a lot. (squares 1.5m) Best, Reza On Wed, Feb 25, 2015 at 10:13 AM, Debasish Das debasish.da...@gmail.com wrote: Is the threshold valid only for tall skinny matrices ? Mine is 6 m x 1.5 m and I made sparsity pattern 100:1.5M..we would like to increase the sparsity pattern to 1000:1.5M I am running 1.1 stable and I get random shuffle failures...may be 1.2 sort shuffle will help.. I read in Reza paper that oversample works only if cols are skinny so I am not very keen to oversample... On Feb 17, 2015 2:01 PM, Xiangrui Meng men...@gmail.com wrote: The complexity of DIMSUM is independent of the number of rows but still have quadratic dependency on the number of columns. 1.5M columns may be too large to use DIMSUM. Try to increase the threshold and see whether it helps. -Xiangrui On Tue, Feb 17, 2015 at 6:28 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, I am running brute force similarity from RowMatrix on a job with 5M x 1.5M sparse matrix with 800M entries. With 200M entries the job run fine but with 800M I am getting exceptions like too many files open and no space left on device... Seems like I need more nodes or use dimsum sampling ? I am running on 10 nodes where ulimit on each node is set at 65K...Memory is not an issue since I can cache the dataset before similarity computation starts. I tested the same job on YARN with Spark 1.1 and Spark 1.2 stable. Both the jobs failed with FetchFailed msgs. Thanks. Deb
Help me understand the partition, parallelism in Spark
Hi, Sparkers: I come from the Hadoop MapReducer world, and try to understand some internal information of spark. From the web and this list, I keep seeing people talking about increase the parallelism if you get the OOM error. I tried to read document as much as possible to understand the RDD partition, and parallelism usage in the spark. I understand that for RDD from HDFS, by default, one partition will be one HDFS block, pretty straightforward. I saw that lots of RDD operations support 2nd parameter of parallelism. This is the part confuse me. From my understand, the parallelism is totally controlled by how many cores you give to your job. Adjust that parameter, or spark.default.parallelism shouldn't have any impact. For example, if I have a 10G data in HDFS, and assume the block size is 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey action, using 200 as the default parallelism. Here is what I assume: We have 100 partitions, as the data comes from 100 blocks. Most likely the spark will generate 100 tasks to read and shuffle them?The 1000 unique keys mean the 1000 reducer group, like in MRIf I set the max core to be 50, so there will be up to 50 tasks can be run concurrently. The rest tasks just have to wait for the core, if there are 50 tasks are running.Since we are doing reduceByKey, shuffling will happen. Data will be shuffled into 1000 partitions, as we have 1000 unique keys.I don't know these 1000 partitions will be processed by how many tasks, maybe this is the parallelism parameter comes in?No matter what parallelism this will be, there are ONLY 50 task can be run concurrently. So if we set more cores, more partitions' data will be processed in the executor (which runs more thread in this case), so more memory needs. I don't see how increasing parallelism could help the OOM in this case.In my test case of Spark SQL, I gave 24G as the executor heap, my join between 2 big datasets keeps getting OOM. I keep increasing the spark.default.parallelism, from 200 to 400, to 2000, even to 4000, no help. What really makes the query finish finally without OOM is after I change the --total-executor-cores from 10 to 4. So my questions are:1) What is the parallelism really mean in the Spark? In the simple example above, for reduceByKey, what difference it is between parallelism change from 10 to 20?2) When we talk about partition in the spark, for the data coming from HDFS, I can understand the partition clearly. For the intermediate data, the partition will be same as key, right? For group, reducing, join action, uniqueness of the keys will be partition. Is that correct?3) Why increasing parallelism could help OOM? I don't get this part. From my limited experience, adjusting the core count really matters for memory. Thanks Yong
Re: Spark cluster set up on EC2 customization
You can easily add a function (say setup_pig) inside the function setup_cluster in this script https://github.com/apache/spark/blob/master/ec2/spark_ec2.py#L649 Thanks Best Regards On Thu, Feb 26, 2015 at 7:08 AM, Sameer Tilak ssti...@live.com wrote: Hi, I was looking at the documentation for deploying Spark cluster on EC2. http://spark.apache.org/docs/latest/ec2-scripts.html We are using Pig to build the data pipeline and then use MLLib for analytics. I was wondering if someone has any experience to include additional tools/services such as Pig/Hadoop in the above deployment script?
Re: Standalone spark
Spark and Hadoop should be listed as 'provided' dependency in your Maven or SBT build. But that should make it available at compile time. On Wed, Feb 25, 2015 at 10:42 PM, boci boci.b...@gmail.com wrote: Hi, I have a little question. I want to develop a spark based application, but spark depend to hadoop-client library. I think it's not necessary (spark standalone) so I excluded from sbt file.. the result is interesting. My trait where I create the spark context not compiled. The error: ... scala.reflect.internal.Types$TypeError: bad symbolic reference. A signature in SparkContext.class refers to term mapred [error] in package org.apache.hadoop which is not available. [error] It may be completely missing from the current classpath, or the version on [error] the classpath might be incompatible with the version used when compiling SparkContext.class. ... I used this class for integration test. I'm using windows and I don't want to using hadoop for integration test. How can I solve this? Thanks Janos - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
upgrade to Spark 1.2.1
Getting an error that confuses me. Running a largish app on a standalone cluster on my laptop. The app uses a guava HashBiMap as a broadcast value. With Spark 1.1.0 I simply registered the class and its serializer with kryo like this: kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new JavaSerializer()) And all was well. I’ve also tried addSerializer instead of register. Now I get a class not found during deserialization. I checked the jar list used to create the context and found the jar that contains HashBiMap but get this error. Any ideas: 15/02/25 14:46:33 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 4.0 (TID 8, 192.168.0.2): java.io.IOException: com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.mahout.drivers.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:95) at org.apache.mahout.drivers.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:94) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1090) ... 19 more == root error Caused by: java.lang.ClassNotFoundException: com.google.common.collect.HashBiMap at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:40) ... 24 more - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Standalone spark
Thanks dude... I think I will pull up a docker container for integration test -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Feb 26, 2015 at 12:22 AM, Sean Owen so...@cloudera.com wrote: Yes, been on the books for a while ... https://issues.apache.org/jira/browse/SPARK-2356 That one just may always be a known 'gotcha' in Windows; it's kind of a Hadoop gotcha. I don't know that Spark 100% works on Windows and it isn't tested on Windows. On Wed, Feb 25, 2015 at 11:05 PM, boci boci.b...@gmail.com wrote: Thanks your fast answer... in windows it's not working, because hadoop (surprise suprise) need winutils.exe. Without this it's not working, but if you not set the hadoop directory You simply get 15/02/26 00:03:16 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Wed, Feb 25, 2015 at 11:50 PM, Sean Owen so...@cloudera.com wrote: Spark and Hadoop should be listed as 'provided' dependency in your Maven or SBT build. But that should make it available at compile time. On Wed, Feb 25, 2015 at 10:42 PM, boci boci.b...@gmail.com wrote: Hi, I have a little question. I want to develop a spark based application, but spark depend to hadoop-client library. I think it's not necessary (spark standalone) so I excluded from sbt file.. the result is interesting. My trait where I create the spark context not compiled. The error: ... scala.reflect.internal.Types$TypeError: bad symbolic reference. A signature in SparkContext.class refers to term mapred [error] in package org.apache.hadoop which is not available. [error] It may be completely missing from the current classpath, or the version on [error] the classpath might be incompatible with the version used when compiling SparkContext.class. ... I used this class for integration test. I'm using windows and I don't want to using hadoop for integration test. How can I solve this? Thanks Janos
Re: upgrade to Spark 1.2.1
Could this be caused by Spark using shaded Guava jar ? Cheers On Wed, Feb 25, 2015 at 3:26 PM, Pat Ferrel p...@occamsmachete.com wrote: Getting an error that confuses me. Running a largish app on a standalone cluster on my laptop. The app uses a guava HashBiMap as a broadcast value. With Spark 1.1.0 I simply registered the class and its serializer with kryo like this: kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new JavaSerializer()) And all was well. I’ve also tried addSerializer instead of register. Now I get a class not found during deserialization. I checked the jar list used to create the context and found the jar that contains HashBiMap but get this error. Any ideas: 15/02/25 14:46:33 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 4.0 (TID 8, 192.168.0.2): java.io.IOException: com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.mahout.drivers.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:95) at org.apache.mahout.drivers.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:94) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1090) ... 19 more == root error Caused by: java.lang.ClassNotFoundException: com.google.common.collect.HashBiMap at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:40) ... 24 more - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark-SQL 1.2.0 sort by results are not consistent with Hive
How many reducers you set for Hive? With small data set, Hive will run in local mode, which will set the reducer count always as 1. From: Kannan Rajah [mailto:kra...@maprtech.com] Sent: Thursday, February 26, 2015 3:02 AM To: Cheng Lian Cc: user@spark.apache.org Subject: Re: Spark-SQL 1.2.0 sort by results are not consistent with Hive Cheng, We tried this setting and it still did not help. This was on Spark 1.2.0. -- Kannan On Mon, Feb 23, 2015 at 6:38 PM, Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com wrote: (Move to user list.) Hi Kannan, You need to set mapred.map.tasks to 1 in hive-site.xml. The reason is this line of codehttps://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala#L68, which overrides spark.default.parallelism. Also, spark.sql.shuffle.parallelism isn’t used here since there’s no shuffle involved (we only need to sort within a partition). Default value of mapred.map.tasks is 2https://hadoop.apache.org/docs/r1.0.4/mapred-default.html. You may see that the Spark SQL result can be divided into two sorted parts from the middle. Cheng On 2/19/15 10:33 AM, Kannan Rajah wrote: According to hive documentation, sort by is supposed to order the results for each reducer. So if we set a single reducer, then the results should be sorted, right? But this is not happening. Any idea why? Looks like the settings I am using to restrict the number of reducers is not having an effect. *Tried the following:* Set spark.default.parallelism to 1 Set spark.sql.shuffle.partitions to 1 These were set in hive-site.xml and also inside spark shell. *Spark-SQL* create table if not exists testSortBy (key int, name string, age int); LOAD DATA LOCAL INPATH '/home/mapr/sample-name-age.txt' OVERWRITE INTO TABLE testSortBy; select * from testSortBY; 1Aditya28 2aash25 3prashanth27 4bharath26 5terry27 6nanda26 7pradeep27 8pratyay26 set spark.default.parallelism=1; set spark.sql.shuffle.partitions=1; select name,age from testSortBy sort by age; aash 25 bharath 26 prashanth 27 Aditya 28 nanda 26 pratyay 26 terry 27 pradeep 27 *HIVE* select name,age from testSortBy sort by age; aash25 bharath26 nanda26 pratyay26 prashanth27 terry27 pradeep27 Aditya28 -- Kannan
Re: Standalone spark
Yes, been on the books for a while ... https://issues.apache.org/jira/browse/SPARK-2356 That one just may always be a known 'gotcha' in Windows; it's kind of a Hadoop gotcha. I don't know that Spark 100% works on Windows and it isn't tested on Windows. On Wed, Feb 25, 2015 at 11:05 PM, boci boci.b...@gmail.com wrote: Thanks your fast answer... in windows it's not working, because hadoop (surprise suprise) need winutils.exe. Without this it's not working, but if you not set the hadoop directory You simply get 15/02/26 00:03:16 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Wed, Feb 25, 2015 at 11:50 PM, Sean Owen so...@cloudera.com wrote: Spark and Hadoop should be listed as 'provided' dependency in your Maven or SBT build. But that should make it available at compile time. On Wed, Feb 25, 2015 at 10:42 PM, boci boci.b...@gmail.com wrote: Hi, I have a little question. I want to develop a spark based application, but spark depend to hadoop-client library. I think it's not necessary (spark standalone) so I excluded from sbt file.. the result is interesting. My trait where I create the spark context not compiled. The error: ... scala.reflect.internal.Types$TypeError: bad symbolic reference. A signature in SparkContext.class refers to term mapred [error] in package org.apache.hadoop which is not available. [error] It may be completely missing from the current classpath, or the version on [error] the classpath might be incompatible with the version used when compiling SparkContext.class. ... I used this class for integration test. I'm using windows and I don't want to using hadoop for integration test. How can I solve this? Thanks Janos - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Considering Spark for large data elements
I have an application which might benefit from Sparks distribution/analysis, but I'm worried about the size and structure of my data set. I need to perform several thousand simulation on a rather large data set and I need access to all the generated simulations. The data element is largely in int[r][c] where r is 100 to 1000 and c is 20-80K (there's more but that array is the bulk of the problem. I have machines and memory capable of doing 6-10 simulations simultaneously in separate jvms. Is this data structure compatible with Sparks RDD notion? If yes, I will have a slough of how-to-get-started questions, the first of which is how to seed the run? My thinking is to use org.apache.spark.api.java.FlatMapFunction starting with an EmptyRDD and the seed data. Would that be the way to go? Thanks
job keeps failing with org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1
I'm using Spark 1.2, stand-alone cluster on ec2 I have a cluster of 8 r3.8xlarge machines but limit the job to only 128 cores. I have also tried other things such as setting 4 workers per r3.8xlarge and 67gb each but this made no difference. The job frequently fails at the end in this step (saveasHadoopFile). It will sometimes work. finalNewBaselinePairRDD is hashPartitioned with 1024 partitions and a total size around 1TB. There are about 13.5M records in finalNewBaselinePairRDD. finalNewBaselinePairRDD is String,String JavaPairRDDText, Text finalBaselineRDDWritable = finalNewBaselinePairRDD.mapToPair(new ConvertToWritableTypes()).persist(StorageLevel.MEMORY_AND_DISK_SER()); // Save to hdfs (gzip) finalBaselineRDDWritable.saveAsHadoopFile(hdfs:///sparksync/, Text.class, Text.class, SequenceFileOutputFormat.class,org.apache.hadoop.io.compress.GzipCodec.class); If anyone has any tips for what I should look into it would be appreciated. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark cluster set up on EC2 customization
Hi, I was looking at the documentation for deploying Spark cluster on EC2. http://spark.apache.org/docs/latest/ec2-scripts.html We are using Pig to build the data pipeline and then use MLLib for analytics. I was wondering if someone has any experience to include additional tools/services such as Pig/Hadoop in the above deployment script?
Re: throughput in the web console?
Yes. # tuples processed in a batch = sum of all the tuples received by all the receivers. In screen shot, there was a batch with 69.9K records, and there was a batch which took 1 s 473 ms. These two batches can be the same, can be different batches. TD On Wed, Feb 25, 2015 at 10:11 AM, Josh J joshjd...@gmail.com wrote: If I'm using the kafka receiver, can I assume the number of records processed in the batch is the sum of the number of records processed by the kafka receiver? So in the screen shot attached the max rate of tuples processed in a batch is 42.7K + 27.2K = 69.9K tuples processed in a batch with a max processing time of 1 second 473 ms? On Wed, Feb 25, 2015 at 8:48 AM, Akhil Das ak...@sigmoidanalytics.com wrote: By throughput you mean Number of events processed etc? [image: Inline image 1] Streaming tab already have these statistics. Thanks Best Regards On Wed, Feb 25, 2015 at 9:59 PM, Josh J joshjd...@gmail.com wrote: On Wed, Feb 25, 2015 at 7:54 AM, Akhil Das ak...@sigmoidanalytics.com wrote: For SparkStreaming applications, there is already a tab called Streaming which displays the basic statistics. Would I just need to extend this tab to add the throughput? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Scheduler hang?
I'm getting this really reliably on Spark 1.2.1. Basically I'm in local mode with parallelism at 8. I have 222 tasks and I never seem to get far past 40. Usually in the 20s to 30s it will just hang. The last logging is below, and a screenshot of the UI. 2015-02-25 20:39:55.779 GMT-0800 INFO [task-result-getter-3] TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on localhost (1/5) 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent to driver 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent to driver 2015-02-25 20:39:55.831 GMT-0800 INFO [task-result-getter-0] TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on localhost (2/5) 2015-02-25 20:39:55.836 GMT-0800 INFO [task-result-getter-1] TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on localhost (3/5) 2015-02-25 20:39:55.891 GMT-0800 INFO [Executor task launch worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent to driver 2015-02-25 20:39:55.896 GMT-0800 INFO [task-result-getter-2] TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on localhost (4/5) [image: Inline image 1] What should I make of this? Where do I start? Thanks, Victor
Fwd: Spark excludes fastutil dependencies we need
Forwarding conversation below that didn't make it to the list. -- Forwarded message -- From: Jim Kleckner j...@cloudphysics.com Date: Wed, Feb 25, 2015 at 8:42 PM Subject: Re: Spark excludes fastutil dependencies we need To: Ted Yu yuzhih...@gmail.com Cc: Sean Owen so...@cloudera.com, user user@spark.apache.org Inline On Wed, Feb 25, 2015 at 1:53 PM, Ted Yu yuzhih...@gmail.com wrote: Interesting. Looking at SparkConf.scala : val configs = Seq( DeprecatedConfig(spark.files.userClassPathFirst, spark.executor.userClassPathFirst, 1.3), DeprecatedConfig(spark.yarn.user.classpath.first, null, 1.3, Use spark.{driver,executor}.userClassPathFirst instead.)) It seems spark.files.userClassPathFirst and spark.yarn.user.classpath.first are deprecated. Note that I did use the non-deprecated version, spark.executor. userClassPathFirst=true. On Wed, Feb 25, 2015 at 12:39 AM, Sean Owen so...@cloudera.com wrote: No, we should not add fastutil back. It's up to the app to bring dependencies it needs, and that's how I understand this issue. The question is really, how to get the classloader visibility right. It depends on where you need these classes. Have you looked into spark.files.userClassPathFirst and spark.yarn.user.classpath.first ? I noted that I tried this in my original email. The issue appears related to the fact that parquet is also creating a shaded jar and that one leaves out the Long2LongOpenHashMap class. FYI, I have subsequently tried removing the exclusion from the spark build and that does cause the fastutil classes to be included and the example works... So, should the userClassPathFirst flag work and there is a bug? Or is it reasonable to put in a pull request for the elimination of the exclusion? On Wed, Feb 25, 2015 at 5:34 AM, Ted Yu yuzhih...@gmail.com wrote: bq. depend on missing fastutil classes like Long2LongOpenHashMap Looks like Long2LongOpenHashMap should be added to the shaded jar. Cheers On Tue, Feb 24, 2015 at 7:36 PM, Jim Kleckner j...@cloudphysics.com wrote: Spark includes the clearspring analytics package but intentionally excludes the dependencies of the fastutil package (see below). Spark includes parquet-column which includes fastutil and relocates it under parquet/ but creates a shaded jar file which is incomplete because it shades out some of the fastutil classes, notably Long2LongOpenHashMap, which is present in the fastutil jar file that parquet-column is referencing. We are using more of the clearspring classes (e.g. QDigest) and those do depend on missing fastutil classes like Long2LongOpenHashMap. Even though I add them to our assembly jar file, the class loader finds the spark assembly and we get runtime class loader errors when we try to use it. It is possible to put our jar file first, as described here: https://issues.apache.org/jira/browse/SPARK-939 http://spark.apache.org/docs/1.2.0/configuration.html#runtime-environment which I tried with args to spark-submit: --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true but we still get the class not found error. We have tried copying the source code for clearspring into our package and renaming the package and that makes it appear to work... Is this risky? It certainly is ugly. Can anyone recommend a way to deal with this dependency **ll ? === The spark/pom.xml file contains the following lines: dependency groupIdcom.clearspring.analytics/groupId artifactIdstream/artifactId version2.7.0/version exclusions exclusion groupIdit.unimi.dsi/groupId artifactIdfastutil/artifactId /exclusion /exclusions /dependency === The parquet-column/pom.xml file contains: artifactIdmaven-shade-plugin/artifactId executions execution phasepackage/phase goals goalshade/goal /goals configuration minimizeJartrue/minimizeJar artifactSet includes includeit.unimi.dsi:fastutil/include /includes /artifactSet relocations relocation patternit.unimi.dsi/pattern shadedPatternparquet.it.unimi.dsi/shadedPattern /relocation /relocations /configuration /execution /executions -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-excludes-fastutil-dependencies-we-need-tp21794.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell
I am now getting the following error. I cross-checked my types and corrected three of them i.e. r26--String, r27--Timestamp, r28--Timestamp. This error still persists. scala sc.textFile(/home/cdhuser/Desktop/Sdp_d.csv).map(_.split(,)).map { r = | val upto_time = sdf.parse(r(23).trim); | calendar.setTime(upto_time); | val r23 = new java.sql.Timestamp(upto_time.getTime) | val insert_time = sdf.parse(r(27).trim) | calendar.setTime(insert_time) | val r27 = new java.sql.Timestamp(insert_time.getTime) | val last_upd_time = sdf.parse(r(28).trim) | calendar.setTime(last_upd_time) | val r28 = new java.sql.Timestamp(last_upd_time.getTime) | new sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim, r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim, r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim, r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim, r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim, r(25).trim, r(26).trim, r27, r28) | }.registerAsTable(sdp_d) console:26: error: type mismatch; found : Int required: Option[Int] new sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim, r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim, r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim, r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim, r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim, r(25).trim, r(26).trim, r27, r28) On Wed, Feb 25, 2015 at 2:32 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It says sdp_d not found, since it is a class you need to instantiate it once. like: sc.textFile(derby.log).map(_.split(,)).map( r = { val upto_time = sdf.parse(r(23).trim); calendar.setTime(upto_time); val r23 = new java.sql.Timestamp(upto_time.getTime); val insert_time = sdf.parse(r(26).trim); calendar.setTime(insert_time); val r26 = new java.sql.Timestamp(insert_time.getTime); val last_upd_time = sdf.parse(r(27).trim); calendar.setTime(last_upd_time); val r27 = new java.sql.Timestamp(last_upd_time.getTime); *new* *sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim, r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim, r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim, r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim, r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim, r(25).trim, r26, r27, r(28).trim)* }).registerAsTable(sdp) Thanks Best Regards On Wed, Feb 25, 2015 at 2:14 PM, anamika gupta anamika.guo...@gmail.com wrote: The link has proved helpful. I have been able to load data, register it as a table and perform simple queries. Thanks Akhil !! Though, I still look forward to knowing where I was going wrong with my previous technique of extending the Product Interface to overcome case class's limit of 22 fields. On Wed, Feb 25, 2015 at 9:45 AM, anamika gupta anamika.guo...@gmail.com wrote: Hi Akhil I guess it skipped my attention. I would definitely give it a try. While I would still like to know what is the issue with the way I have created schema? On Tue, Feb 24, 2015 at 4:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you happen to have a look at https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema Thanks Best Regards On Tue, Feb 24, 2015 at 3:39 PM, anu anamika.guo...@gmail.com wrote: My issue is posted here on stack-overflow. What am I doing wrong here? http://stackoverflow.com/questions/28689186/facing-error-while-extending-scala-class-with-product-interface-to-overcome-limi -- View this message in context: Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell http://apache-spark-user-list.1001560.n3.nabble.com/Facing-error-while-extending-scala-class-with-Product-interface-to-overcome-limit-of-22-fields-in-spl-tp21787.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: NegativeArraySizeException when doing joins on skewed data
I get the same exception simply by doing a large broadcast of about 6GB. Note that I’m broadcasting a small number (~3m) of fat objects. There’s plenty of free RAM. This and related kryo exceptions seem to crop-up whenever an object graph of more than a couple of GB gets passed around. at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623) Caused by: java.lang.NegativeArraySizeException at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:539) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) ... 23 more On 26 February 2015 at 03:49, soila skavu...@gmail.com wrote: I have been running into NegativeArraySizeException's when doing joins on data with very skewed key distributions in Spark 1.2.0. I found a previous post that mentioned that this exception arises when the size of the blocks spilled during the shuffle exceeds 2GB. The post recommended increasing the number of partitions. I tried increasing the number of partitions, and using the RangePartitioner instead of the HashPartitioner but still encountered the problem. Does Spark support skewed joins similar to Pig? com.esotericsoftware.kryo.KryoException: java.lang.NegativeArraySizeException Serialization trace: otherElements (org.apache.spark.util.collection.CompactBuffer) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at
Re: group by order by fails
Which version of spark are you having? It seems there was a similar Jira https://issues.apache.org/jira/browse/SPARK-2474 Thanks Best Regards On Thu, Feb 26, 2015 at 12:03 PM, tridib tridib.sama...@live.com wrote: Hi, I need to find top 10 most selling samples. So query looks like: select s.name, count(s.name) from sample s group by s.name order by count(s.name) This query fails with following error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree: Sort [COUNT(name#0) ASC], true Exchange (RangePartitioning [COUNT(name#0) ASC], 200) Aggregate false, [name#0], [name#0 AS name#1,Coalesce(SUM(PartialCount#4L),0) AS count#2L,name#0] Exchange (HashPartitioning [name#0], 200) Aggregate true, [name#0], [name#0,COUNT(name#0) AS PartialCount#4L] PhysicalRDD [name#0], MapPartitionsRDD[1] at mapPartitions at JavaSQLContext.scala:102 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:206) at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:43) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:444) at org.apache.spark.sql.api.java.JavaSchemaRDD.collect(JavaSchemaRDD.scala:114) at com.edifecs.platform.df.analytics.spark.domain.dao.OrderByTest.testGetVisitDistributionByPrimaryDx(OrderByTest.java:48) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:121) Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange (RangePartitioning [COUNT(name#0) ASC], 200) Aggregate false, [name#0], [name#0 AS name#1,Coalesce(SUM(PartialCount#4L),0) AS count#2L,name#0] Exchange (HashPartitioning [name#0], 200) Aggregate true, [name#0], [name#0,COUNT(name#0) AS PartialCount#4L] PhysicalRDD [name#0], MapPartitionsRDD[1] at mapPartitions at JavaSQLContext.scala:102 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:47) at org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:207) at org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:207) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) ... 37 more Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No function to evaluate expression. type: Count, tree: COUNT(input[2]) at
RE: group by order by fails
Actually I just realized , I am using 1.2.0. Thanks Tridib Date: Thu, 26 Feb 2015 12:37:06 +0530 Subject: Re: group by order by fails From: ak...@sigmoidanalytics.com To: tridib.sama...@live.com CC: user@spark.apache.org Which version of spark are you having? It seems there was a similar Jira https://issues.apache.org/jira/browse/SPARK-2474ThanksBest Regards On Thu, Feb 26, 2015 at 12:03 PM, tridib tridib.sama...@live.com wrote: Hi, I need to find top 10 most selling samples. So query looks like: select s.name, count(s.name) from sample s group by s.name order by count(s.name) This query fails with following error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree: Sort [COUNT(name#0) ASC], true Exchange (RangePartitioning [COUNT(name#0) ASC], 200) Aggregate false, [name#0], [name#0 AS name#1,Coalesce(SUM(PartialCount#4L),0) AS count#2L,name#0] Exchange (HashPartitioning [name#0], 200) Aggregate true, [name#0], [name#0,COUNT(name#0) AS PartialCount#4L] PhysicalRDD [name#0], MapPartitionsRDD[1] at mapPartitions at JavaSQLContext.scala:102 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:206) at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:43) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:444) at org.apache.spark.sql.api.java.JavaSchemaRDD.collect(JavaSchemaRDD.scala:114) at com.edifecs.platform.df.analytics.spark.domain.dao.OrderByTest.testGetVisitDistributionByPrimaryDx(OrderByTest.java:48) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:121) Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange (RangePartitioning [COUNT(name#0) ASC], 200) Aggregate false, [name#0], [name#0 AS name#1,Coalesce(SUM(PartialCount#4L),0) AS count#2L,name#0] Exchange (HashPartitioning [name#0], 200) Aggregate true, [name#0], [name#0,COUNT(name#0) AS PartialCount#4L] PhysicalRDD [name#0], MapPartitionsRDD[1] at mapPartitions at JavaSQLContext.scala:102 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:47) at org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:207) at org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:207) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
Re: Fwd: Spark excludes fastutil dependencies we need
I created an issue and pull request. Discussion can continue there: https://issues.apache.org/jira/browse/SPARK-6029 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-excludes-fastutil-dependencies-we-need-tp21812p21814.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
group by order by fails
Hi, I need to find top 10 most selling samples. So query looks like: select s.name, count(s.name) from sample s group by s.name order by count(s.name) This query fails with following error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree: Sort [COUNT(name#0) ASC], true Exchange (RangePartitioning [COUNT(name#0) ASC], 200) Aggregate false, [name#0], [name#0 AS name#1,Coalesce(SUM(PartialCount#4L),0) AS count#2L,name#0] Exchange (HashPartitioning [name#0], 200) Aggregate true, [name#0], [name#0,COUNT(name#0) AS PartialCount#4L] PhysicalRDD [name#0], MapPartitionsRDD[1] at mapPartitions at JavaSQLContext.scala:102 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:206) at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:43) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:444) at org.apache.spark.sql.api.java.JavaSchemaRDD.collect(JavaSchemaRDD.scala:114) at com.edifecs.platform.df.analytics.spark.domain.dao.OrderByTest.testGetVisitDistributionByPrimaryDx(OrderByTest.java:48) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:121) Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange (RangePartitioning [COUNT(name#0) ASC], 200) Aggregate false, [name#0], [name#0 AS name#1,Coalesce(SUM(PartialCount#4L),0) AS count#2L,name#0] Exchange (HashPartitioning [name#0], 200) Aggregate true, [name#0], [name#0,COUNT(name#0) AS PartialCount#4L] PhysicalRDD [name#0], MapPartitionsRDD[1] at mapPartitions at JavaSQLContext.scala:102 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:47) at org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:207) at org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:207) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) ... 37 more Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No function to evaluate expression. type: Count, tree: COUNT(input[2]) at org.apache.spark.sql.catalyst.expressions.AggregateExpression.eval(aggregates.scala:41) at org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:250) at org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:242) at scala.math.Ordering$$anon$5.compare(Ordering.scala:122) at
Re: Scheduler hang?
What operation are you trying to do and how big is the data that you are operating on? Here's a few things which you can try: - Repartition the RDD to a higher number than 222 - Specify the master as local[*] or local[10] - Use Kryo Serializer (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable RDD Compression (.set(spark.rdd.compress,true) ) Thanks Best Regards On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen v...@paxata.com wrote: I'm getting this really reliably on Spark 1.2.1. Basically I'm in local mode with parallelism at 8. I have 222 tasks and I never seem to get far past 40. Usually in the 20s to 30s it will just hang. The last logging is below, and a screenshot of the UI. 2015-02-25 20:39:55.779 GMT-0800 INFO [task-result-getter-3] TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on localhost (1/5) 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent to driver 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent to driver 2015-02-25 20:39:55.831 GMT-0800 INFO [task-result-getter-0] TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on localhost (2/5) 2015-02-25 20:39:55.836 GMT-0800 INFO [task-result-getter-1] TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on localhost (3/5) 2015-02-25 20:39:55.891 GMT-0800 INFO [Executor task launch worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent to driver 2015-02-25 20:39:55.896 GMT-0800 INFO [task-result-getter-2] TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on localhost (4/5) [image: Inline image 1] What should I make of this? Where do I start? Thanks, Victor
Re: Spark excludes fastutil dependencies we need
Inline On Wed, Feb 25, 2015 at 1:53 PM, Ted Yu yuzhih...@gmail.com wrote: Interesting. Looking at SparkConf.scala : val configs = Seq( DeprecatedConfig(spark.files.userClassPathFirst, spark.executor.userClassPathFirst, 1.3), DeprecatedConfig(spark.yarn.user.classpath.first, null, 1.3, Use spark.{driver,executor}.userClassPathFirst instead.)) It seems spark.files.userClassPathFirst and spark.yarn.user.classpath.first are deprecated. Note that I did use the non-deprecated version, spark.executor. userClassPathFirst=true. On Wed, Feb 25, 2015 at 12:39 AM, Sean Owen so...@cloudera.com wrote: No, we should not add fastutil back. It's up to the app to bring dependencies it needs, and that's how I understand this issue. The question is really, how to get the classloader visibility right. It depends on where you need these classes. Have you looked into spark.files.userClassPathFirst and spark.yarn.user.classpath.first ? I noted that I tried this in my original email. The issue appears related to the fact that parquet is also creating a shaded jar and that one leaves out the Long2LongOpenHashMap class. FYI, I have subsequently tried removing the exclusion from the spark build and that does cause the fastutil classes to be included and the example works... So, should the userClassPathFirst flag work and there is a bug? Or is it reasonable to put in a pull request for the elimination of the exclusion? On Wed, Feb 25, 2015 at 5:34 AM, Ted Yu yuzhih...@gmail.com wrote: bq. depend on missing fastutil classes like Long2LongOpenHashMap Looks like Long2LongOpenHashMap should be added to the shaded jar. Cheers On Tue, Feb 24, 2015 at 7:36 PM, Jim Kleckner j...@cloudphysics.com wrote: Spark includes the clearspring analytics package but intentionally excludes the dependencies of the fastutil package (see below). Spark includes parquet-column which includes fastutil and relocates it under parquet/ but creates a shaded jar file which is incomplete because it shades out some of the fastutil classes, notably Long2LongOpenHashMap, which is present in the fastutil jar file that parquet-column is referencing. We are using more of the clearspring classes (e.g. QDigest) and those do depend on missing fastutil classes like Long2LongOpenHashMap. Even though I add them to our assembly jar file, the class loader finds the spark assembly and we get runtime class loader errors when we try to use it. It is possible to put our jar file first, as described here: https://issues.apache.org/jira/browse/SPARK-939 http://spark.apache.org/docs/1.2.0/configuration.html#runtime-environment which I tried with args to spark-submit: --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true but we still get the class not found error. We have tried copying the source code for clearspring into our package and renaming the package and that makes it appear to work... Is this risky? It certainly is ugly. Can anyone recommend a way to deal with this dependency **ll ? === The spark/pom.xml file contains the following lines: dependency groupIdcom.clearspring.analytics/groupId artifactIdstream/artifactId version2.7.0/version exclusions exclusion groupIdit.unimi.dsi/groupId artifactIdfastutil/artifactId /exclusion /exclusions /dependency === The parquet-column/pom.xml file contains: artifactIdmaven-shade-plugin/artifactId executions execution phasepackage/phase goals goalshade/goal /goals configuration minimizeJartrue/minimizeJar artifactSet includes includeit.unimi.dsi:fastutil/include /includes /artifactSet relocations relocation patternit.unimi.dsi/pattern shadedPatternparquet.it.unimi.dsi/shadedPattern /relocation /relocations /configuration /execution /executions -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-excludes-fastutil-dependencies-we-need-tp21794.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Executor lost with too many temp files
Can you try increasing the ulimit -n on your machine. On Mon, Feb 23, 2015 at 10:55 PM, Marius Soutier mps@gmail.com wrote: Hi Sameer, I’m still using Spark 1.1.1, I think the default is hash shuffle. No external shuffle service. We are processing gzipped JSON files, the partitions are the amount of input files. In my current data set we have ~850 files that amount to 60 GB (so ~600 GB uncompressed). We have 5 workers with 8 cores and 48 GB RAM each. We extract five different groups of data from this to filter, clean and denormalize (i.e. join) it for easier downstream processing. By the way this code does not seem to complete at all without using coalesce() at a low number, 5 or 10 work great. Everything above that make it very likely it will crash, even on smaller datasets (~300 files). But I’m not sure if this is related to the above issue. On 23.02.2015, at 18:15, Sameer Farooqui same...@databricks.com wrote: Hi Marius, Are you using the sort or hash shuffle? Also, do you have the external shuffle service enabled (so that the Worker JVM or NodeManager can still serve the map spill files after an Executor crashes)? How many partitions are in your RDDs before and after the problematic shuffle operation? On Monday, February 23, 2015, Marius Soutier mps@gmail.com wrote: Hi guys, I keep running into a strange problem where my jobs start to fail with the dreaded Resubmitted (resubmitted due to lost executor)” because of having too many temp files from previous runs. Both /var/run and /spill have enough disk space left, but after a given amount of jobs have run, following jobs will struggle with completion. There are a lot of failures without any exception message, only the above mentioned lost executor. As soon as I clear out /var/run/spark/work/ and the spill disk, everything goes back to normal. Thanks for any hint, - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark excludes fastutil dependencies we need
Maybe drop the exclusion for parquet-provided profile ? Cheers On Wed, Feb 25, 2015 at 8:42 PM, Jim Kleckner j...@cloudphysics.com wrote: Inline On Wed, Feb 25, 2015 at 1:53 PM, Ted Yu yuzhih...@gmail.com wrote: Interesting. Looking at SparkConf.scala : val configs = Seq( DeprecatedConfig(spark.files.userClassPathFirst, spark.executor.userClassPathFirst, 1.3), DeprecatedConfig(spark.yarn.user.classpath.first, null, 1.3, Use spark.{driver,executor}.userClassPathFirst instead.)) It seems spark.files.userClassPathFirst and spark.yarn.user.classpath.first are deprecated. Note that I did use the non-deprecated version, spark.executor. userClassPathFirst=true. On Wed, Feb 25, 2015 at 12:39 AM, Sean Owen so...@cloudera.com wrote: No, we should not add fastutil back. It's up to the app to bring dependencies it needs, and that's how I understand this issue. The question is really, how to get the classloader visibility right. It depends on where you need these classes. Have you looked into spark.files.userClassPathFirst and spark.yarn.user.classpath.first ? I noted that I tried this in my original email. The issue appears related to the fact that parquet is also creating a shaded jar and that one leaves out the Long2LongOpenHashMap class. FYI, I have subsequently tried removing the exclusion from the spark build and that does cause the fastutil classes to be included and the example works... So, should the userClassPathFirst flag work and there is a bug? Or is it reasonable to put in a pull request for the elimination of the exclusion? On Wed, Feb 25, 2015 at 5:34 AM, Ted Yu yuzhih...@gmail.com wrote: bq. depend on missing fastutil classes like Long2LongOpenHashMap Looks like Long2LongOpenHashMap should be added to the shaded jar. Cheers On Tue, Feb 24, 2015 at 7:36 PM, Jim Kleckner j...@cloudphysics.com wrote: Spark includes the clearspring analytics package but intentionally excludes the dependencies of the fastutil package (see below). Spark includes parquet-column which includes fastutil and relocates it under parquet/ but creates a shaded jar file which is incomplete because it shades out some of the fastutil classes, notably Long2LongOpenHashMap, which is present in the fastutil jar file that parquet-column is referencing. We are using more of the clearspring classes (e.g. QDigest) and those do depend on missing fastutil classes like Long2LongOpenHashMap. Even though I add them to our assembly jar file, the class loader finds the spark assembly and we get runtime class loader errors when we try to use it. It is possible to put our jar file first, as described here: https://issues.apache.org/jira/browse/SPARK-939 http://spark.apache.org/docs/1.2.0/configuration.html#runtime-environment which I tried with args to spark-submit: --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true but we still get the class not found error. We have tried copying the source code for clearspring into our package and renaming the package and that makes it appear to work... Is this risky? It certainly is ugly. Can anyone recommend a way to deal with this dependency **ll ? === The spark/pom.xml file contains the following lines: dependency groupIdcom.clearspring.analytics/groupId artifactIdstream/artifactId version2.7.0/version exclusions exclusion groupIdit.unimi.dsi/groupId artifactIdfastutil/artifactId /exclusion /exclusions /dependency === The parquet-column/pom.xml file contains: artifactIdmaven-shade-plugin/artifactId executions execution phasepackage/phase goals goalshade/goal /goals configuration minimizeJartrue/minimizeJar artifactSet includes includeit.unimi.dsi:fastutil/include /includes /artifactSet relocations relocation patternit.unimi.dsi/pattern shadedPatternparquet.it.unimi.dsi/shadedPattern /relocation /relocations /configuration /execution /executions -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-excludes-fastutil-dependencies-we-need-tp21794.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: Many Receiver vs. Many threads per Receiver
Spark Streaming has a new Kafka direct stream, to be release as experimental feature with 1.3. That uses a low level consumer. Not sure if it satisfies your purpose. If you want more control, its best to create your own Receiver with the low level Kafka API. TD On Tue, Feb 24, 2015 at 12:09 AM, bit1...@163.com bit1...@163.com wrote: Thanks Akhil. Not sure whether thelowlevel consumer. https://github.com/dibbhatt/kafka-spark-consumerwill be officially supported by Spark Streaming. So far, I don't see it mentioned/documented in the spark streaming programming guide. -- bit1...@163.com *From:* Akhil Das ak...@sigmoidanalytics.com *Date:* 2015-02-24 16:21 *To:* bit1...@163.com *CC:* user user@spark.apache.org *Subject:* Re: Many Receiver vs. Many threads per Receiver I believe when you go with 1, it will distribute the consumer across your cluster (possibly on 6 machines), but still it i don't see a away to tell from which partition it will consume etc. If you are looking to have a consumer where you can specify the partition details and all, then you are better off with the lowlevel consumer. https://github.com/dibbhatt/kafka-spark-consumer Thanks Best Regards On Tue, Feb 24, 2015 at 9:36 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am experimenting Spark Streaming and Kafka Integration, To read messages from Kafka in parallel, basically there are two ways 1. Create many Receivers like (1 to 6).map(_ = KakfaUtils.createStream). 2. Specifiy many threads when calling KakfaUtils.createStream like val topicMap(myTopic=6), this will create one receiver with 6 reading threads. My question is which option is better, sounds option 2 is better is to me because it saves a lot of cores(one Receiver one core), but I learned from somewhere else that choice 1 is better, so I would ask and see how you guys elaborate on this. Thank -- bit1...@163.com
Re: Number of parallel tasks
Did you try setting .set(spark.cores.max, 20) Thanks Best Regards On Wed, Feb 25, 2015 at 10:21 PM, Akshat Aranya aara...@gmail.com wrote: I have Spark running in standalone mode with 4 executors, and each executor with 5 cores each (spark.executor.cores=5). However, when I'm processing an RDD with ~90,000 partitions, I only get 4 parallel tasks. Shouldn't I be getting 4x5=20 parallel task executions?
What is best way to run spark job in yarn-cluster mode from java program(servlet container) and NOT using spark-submit command.
Hello Spark experts I have tried reading spark documentation and searched many posts in this forum but I couldn't find satisfactory answer to my question. I have recently started using spark, so I may be missing something and that's why I am looking for your guidance here. I have a situation where I am running web application in Jetty using Spring boot.My web application receives a REST web service request based on that It needs to trigger spark calculation job in Yarn cluster. Since my job can take longer to run and can access data from HDFS, so I want to run the spark job in yarn-cluster mode and I don't want to keep spark context alive in my web layer. One other reason for this is my application is multi tenant so each tenant can run it's own job, so in yarn-cluster mode each tenant's job can start it's own driver and run in it's own spark cluster. In web app JVM, I assume I can't run multiple spark context in one JVM. I want to trigger spark jobs in yarn-cluster mode grammatically, from java program in the my web application. what is the best way to achieve this. I am exploring various options and looking your guidance on which one is best 1. I can use *org.apache.spark.deploy.yarn.Client* class /submitApplication()/ method. But I assume this class is not a public API and can change between various spark releases.Also I noticed that this class is made private for spark package in spark 1.2. In version 1.1, it was public. So I have risk of breaking my code when I do spark upgrade if I use this method. 2. I can use *spark-submit* command line shell to submit my jobs. But to trigger it from my web application I need to use either Java ProcessBuilder api or some package built on java ProcessBuilder. This has 2 issues. First it doesn't sound like a clean way of doing it. I should have a programatic way of triggering my spark applications in YARN. If YARN api allows it then why we don't have this in Spark? Second problem will be I will loose the capability of monitoring the submitted application and getting it's status.. Only crude way of doing it is reading the output stream of spark-submit shell, which again doesn't sound like good approach. Please suggest, what is best way of doing this with latest version of spark(1.2.1). Later I have plans to deploy this entire application in amazon EMR. So approach should work there also. Thanks in advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-best-way-to-run-spark-job-in-yarn-cluster-mode-from-java-program-servlet-container-and-NOT-u-tp21817.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Setting up Spark with YARN on EC2 cluster
Hi, I want to setup a Spark cluster with YARN dependency on Amazon EC2. I was reading this https://spark.apache.org/docs/1.2.0/running-on-yarn.html document and I understand that Hadoop has to be setup for running Spark with YARN. My questions - 1. Do we have to setup Hadoop cluster on EC2 and then build Spark on it? 2. Is there a way to modify the existing Spark cluster to work with YARN? Thanks in advance. Harika -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Spark-with-YARN-on-EC2-cluster-tp21818.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Effects of persist(XYZ_2)
Hi, just a quick question about calling persist with the _2 option. Is the 2x replication only useful for fault tolerance, or will it also increase job speed by avoiding network transfers? Assuming I’m doing joins or other shuffle operations. Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Effects of persist(XYZ_2)
If you mean, can both copies of the blocks be used for computations? yes they can. On Wed, Feb 25, 2015 at 10:36 AM, Marius Soutier mps@gmail.com wrote: Hi, just a quick question about calling persist with the _2 option. Is the 2x replication only useful for fault tolerance, or will it also increase job speed by avoiding network transfers? Assuming I’m doing joins or other shuffle operations. Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Effects of persist(XYZ_2)
Yes. Effectively, could it avoid network transfers? Or put differently, would an option like persist(MEMORY_ALL) improve job speed by caching an RDD on every worker? On 25.02.2015, at 11:42, Sean Owen so...@cloudera.com wrote: If you mean, can both copies of the blocks be used for computations? yes they can. On Wed, Feb 25, 2015 at 10:36 AM, Marius Soutier mps@gmail.com wrote: Hi, just a quick question about calling persist with the _2 option. Is the 2x replication only useful for fault tolerance, or will it also increase job speed by avoiding network transfers? Assuming I’m doing joins or other shuffle operations. Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: No executors allocated on yarn with latest master branch
We're using the capacity scheduler, to the best of my knowledge. Unsure if multi resource scheduling is used, but if you know of an easy way to figure that out, then let me know. Thanks, Anders On Sat, Feb 21, 2015 at 12:05 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Are you using the capacity scheduler or fifo scheduler without multi resource scheduling by any chance? On Thu, Feb 12, 2015 at 1:51 PM, Anders Arpteg arp...@spotify.com wrote: The nm logs only seems to contain similar to the following. Nothing else in the same time range. Any help? 2015-02-12 20:47:31,245 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_02 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_12 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_22 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_32 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_42 2015-02-12 21:24:30,515 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: FINISH_APPLICATION sent to absent application application_1422406067005_0053 On Thu, Feb 12, 2015 at 10:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote: It seems unlikely to me that it would be a 2.2 issue, though not entirely impossible. Are you able to find any of the container logs? Is the NodeManager launching containers and reporting some exit code? -Sandy On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg arp...@spotify.com wrote: No, not submitting from windows, from a debian distribution. Had a quick look at the rm logs, and it seems some containers are allocated but then released again for some reason. Not easy to make sense of the logs, but here is a snippet from the logs (from a test in our small test cluster) if you'd like to have a closer look: http://pastebin.com/8WU9ivqC Sandy, sounds like it could possible be a 2.2 issue then, or what do you think? Thanks, Anders On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: This is tricky to debug. Check logs of node and resource manager of YARN to see if you can trace the error. In the past I have to closely look at arguments getting passed to YARN container (they get logged before attempting to launch containers). If I still don't get a clue, I had to check the script generated by YARN to execute the container and even run manually to trace at what line the error has occurred. BTW are you submitting the job from windows? On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote: Interesting to hear that it works for you. Are you using Yarn 2.2 as well? No strange log message during startup, and can't see any other log messages since no executer gets launched. Does not seems to work in yarn-client mode either, failing with the exception below. Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) at org.apache.spark.SparkContext.init(SparkContext.scala:370) at com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8) at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42) at com.spotify.analytics.DataSampler.main(DataSampler.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178) at
Re: method newAPIHadoopFile
This is the declaration of my custom inputformat public class NetCDFFileInputFormat extends ArrayBasedFileInputFormat public abstract class ArrayBasedFileInputFormat extends org.apache.hadoop.mapreduce.lib.input.FileInputFormat Best, Patcharee On 25. feb. 2015 10:15, patcharee wrote: Hi, I am new to spark and scala. I have a custom inputformat (used before with mapreduce) and I am trying to use it in spark. In java api (the syntax is correct): JavaPairRDDWRFIndex, WRFVariable pairVarOriRDD = sc.newAPIHadoopFile( path, NetCDFFileInputFormat.class, WRFIndex.class, WRFVariable.class, jobConf); But in scala: val pairVarOriRDD = sc.newAPIHadoopFile(path, classOf[NetCDFFileInputFormat], classOf[WRFIndex], classOf[WRFVariable], jobConf) The compiler complained inferred type arguments [no.uni.computing.io.WRFIndex,no.uni.computing.io.WRFVariable,no.uni.computing.io.input.NetCDFFileInputFormat] do not conform to method newAPIHadoopFile's type parameter bounds [K,V,F : org.apache.hadoop.mapreduce.InputFormat[K,V]] What is the correct syntax for scala api? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: method newAPIHadoopFile
OK, from the declaration you sent me separately: public class NetCDFFileInputFormat extends ArrayBasedFileInputFormat public abstract class ArrayBasedFileInputFormat extends org.apache.hadoop.mapreduce.lib.input.FileInputFormat It looks like you do not declare any generic types that FileInputFormat declares for the key and value type. I think you can get away with this in the Java API with warnings, but scalac is correct that you have not given an InputFormat that matches the bounds required by the API. That is you need to extend something like ArrayBasedFileInputFormat WRFIndex ,WRFVariable On Wed, Feb 25, 2015 at 9:15 AM, patcharee patcharee.thong...@uni.no wrote: Hi, I am new to spark and scala. I have a custom inputformat (used before with mapreduce) and I am trying to use it in spark. In java api (the syntax is correct): JavaPairRDDWRFIndex, WRFVariable pairVarOriRDD = sc.newAPIHadoopFile( path, NetCDFFileInputFormat.class, WRFIndex.class, WRFVariable.class, jobConf); But in scala: val pairVarOriRDD = sc.newAPIHadoopFile(path, classOf[NetCDFFileInputFormat], classOf[WRFIndex], classOf[WRFVariable], jobConf) The compiler complained inferred type arguments [no.uni.computing.io.WRFIndex,no.uni.computing.io.WRFVariable,no.uni.computing.io.input.NetCDFFileInputFormat] do not conform to method newAPIHadoopFile's type parameter bounds [K,V,F : org.apache.hadoop.mapreduce.InputFormat[K,V]] What is the correct syntax for scala api? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell
The link has proved helpful. I have been able to load data, register it as a table and perform simple queries. Thanks Akhil !! Though, I still look forward to knowing where I was going wrong with my previous technique of extending the Product Interface to overcome case class's limit of 22 fields. On Wed, Feb 25, 2015 at 9:45 AM, anamika gupta anamika.guo...@gmail.com wrote: Hi Akhil I guess it skipped my attention. I would definitely give it a try. While I would still like to know what is the issue with the way I have created schema? On Tue, Feb 24, 2015 at 4:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you happen to have a look at https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema Thanks Best Regards On Tue, Feb 24, 2015 at 3:39 PM, anu anamika.guo...@gmail.com wrote: My issue is posted here on stack-overflow. What am I doing wrong here? http://stackoverflow.com/questions/28689186/facing-error-while-extending-scala-class-with-product-interface-to-overcome-limi -- View this message in context: Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell http://apache-spark-user-list.1001560.n3.nabble.com/Facing-error-while-extending-scala-class-with-Product-interface-to-overcome-limit-of-22-fields-in-spl-tp21787.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell
It says sdp_d not found, since it is a class you need to instantiate it once. like: sc.textFile(derby.log).map(_.split(,)).map( r = { val upto_time = sdf.parse(r(23).trim); calendar.setTime(upto_time); val r23 = new java.sql.Timestamp(upto_time.getTime); val insert_time = sdf.parse(r(26).trim); calendar.setTime(insert_time); val r26 = new java.sql.Timestamp(insert_time.getTime); val last_upd_time = sdf.parse(r(27).trim); calendar.setTime(last_upd_time); val r27 = new java.sql.Timestamp(last_upd_time.getTime); *new* *sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim, r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim, r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim, r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim, r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim, r(25).trim, r26, r27, r(28).trim)* }).registerAsTable(sdp) Thanks Best Regards On Wed, Feb 25, 2015 at 2:14 PM, anamika gupta anamika.guo...@gmail.com wrote: The link has proved helpful. I have been able to load data, register it as a table and perform simple queries. Thanks Akhil !! Though, I still look forward to knowing where I was going wrong with my previous technique of extending the Product Interface to overcome case class's limit of 22 fields. On Wed, Feb 25, 2015 at 9:45 AM, anamika gupta anamika.guo...@gmail.com wrote: Hi Akhil I guess it skipped my attention. I would definitely give it a try. While I would still like to know what is the issue with the way I have created schema? On Tue, Feb 24, 2015 at 4:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you happen to have a look at https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema Thanks Best Regards On Tue, Feb 24, 2015 at 3:39 PM, anu anamika.guo...@gmail.com wrote: My issue is posted here on stack-overflow. What am I doing wrong here? http://stackoverflow.com/questions/28689186/facing-error-while-extending-scala-class-with-product-interface-to-overcome-limi -- View this message in context: Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell http://apache-spark-user-list.1001560.n3.nabble.com/Facing-error-while-extending-scala-class-with-Product-interface-to-overcome-limit-of-22-fields-in-spl-tp21787.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Running multiple threads with same Spark Context
Hi Yana, I tried running the program after setting the property spark.scheduler.mode to FAIR. But the result is same as previous. Are there any other properties that have to be set? On Tue, Feb 24, 2015 at 10:26 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: It's hard to tell. I have not run this on EC2 but this worked for me: The only thing that I can think of is that the scheduling mode is set to - *Scheduling Mode:* FAIR val pool: ExecutorService = Executors.newFixedThreadPool(poolSize) while_loop to get curr_job pool.execute(new ReportJob(sqlContext, curr_job, i)) class ReportJob(sqlContext:org.apache.spark.sql.hive.HiveContext,query: String,id:Int) extends Runnable with Logging { def threadId = (Thread.currentThread.getName() + \t) def run() { logInfo(s* Running ${threadId} ${id}) val startTime = Platform.currentTime val hiveQuery=query val result_set = sqlContext.sql(hiveQuery) result_set.repartition(1) result_set.saveAsParquetFile(shdfs:///tmp/${id}) logInfo(s* DONE ${threadId} ${id} time: +(Platform.currentTime-startTime)) } } On Tue, Feb 24, 2015 at 4:04 AM, Harika matha.har...@gmail.com wrote: Hi all, I have been running a simple SQL program on Spark. To test the concurrency, I have created 10 threads inside the program, all threads using same SQLContext object. When I ran the program on my EC2 cluster using spark-submit, only 3 threads were running in parallel. I have repeated the test on different EC2 clusters (containing different number of cores) and found out that only 3 threads are running in parallel on every cluster. Why is this behaviour seen? What does this number 3 specify? Is there any configuration parameter that I have to set if I want to run more threads concurrently? Thanks Harika -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-multiple-threads-with-same-Spark-Context-tp21784.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
method newAPIHadoopFile
Hi, I am new to spark and scala. I have a custom inputformat (used before with mapreduce) and I am trying to use it in spark. In java api (the syntax is correct): JavaPairRDDWRFIndex, WRFVariable pairVarOriRDD = sc.newAPIHadoopFile( path, NetCDFFileInputFormat.class, WRFIndex.class, WRFVariable.class, jobConf); But in scala: val pairVarOriRDD = sc.newAPIHadoopFile(path, classOf[NetCDFFileInputFormat], classOf[WRFIndex], classOf[WRFVariable], jobConf) The compiler complained inferred type arguments [no.uni.computing.io.WRFIndex,no.uni.computing.io.WRFVariable,no.uni.computing.io.input.NetCDFFileInputFormat] do not conform to method newAPIHadoopFile's type parameter bounds [K,V,F : org.apache.hadoop.mapreduce.InputFormat[K,V]] What is the correct syntax for scala api? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: throughput in the web console?
Did you have a look at https://spark.apache.org/docs/1.0.2/api/scala/index.html#org.apache.spark.scheduler.SparkListener And for Streaming: https://spark.apache.org/docs/1.0.2/api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener Thanks Best Regards On Tue, Feb 24, 2015 at 10:29 PM, Josh J joshjd...@gmail.com wrote: Hi, I plan to run a parameter search varying the number of cores, epoch, and parallelism. The web console provides a way to archive the previous runs, though is there a way to view in the console the throughput? Rather than logging the throughput separately to the log files and correlating the logs files to the web console processing times? Thanks, Josh
RE: used cores are less then total no. of core
Thanks Akhil , it was a simple fix which you told .. I missed it .. ☺ From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, February 25, 2015 12:48 PM To: Somnath Pandeya Cc: user@spark.apache.org Subject: Re: used cores are less then total no. of core You can set the following in the conf while creating the SparkContext (if you are not using spark-submit) .set(spark.cores.max, 32) Thanks Best Regards On Wed, Feb 25, 2015 at 11:52 AM, Somnath Pandeya somnath_pand...@infosys.commailto:somnath_pand...@infosys.com wrote: Hi All, I am running a simple word count example of spark (standalone cluster) , In the UI it is showing For each worker no. of cores available are 32 ,but while running the jobs only 5 cores are being used, What should I do to increase no. of used core or it is selected based on jobs. Thanks Somnaht CAUTION - Disclaimer * This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely for the use of the addressee(s). If you are not the intended recipient, please notify the sender by e-mail and delete the original message. Further, you are not to copy, disclose, or distribute this e-mail or its contents to any other person and any such actions are unlawful. This e-mail may contain viruses. Infosys has taken every reasonable precaution to minimize this risk, but is not liable for any damage you may sustain as a result of any virus in this e-mail. You should carry out your own virus checks before opening the e-mail or attachment. Infosys reserves the right to monitor and review the content of all messages sent to or from this e-mail address. Messages sent to or from this e-mail address may be stored on the Infosys e-mail system. ***INFOSYS End of Disclaimer INFOSYS***
Re: spark streaming: stderr does not roll
These settings don't control what happens to stderr, right? stderr is up to the process that invoked the driver to control. You may wish to configure log4j to log to files instead. On Wed, Nov 12, 2014 at 8:15 PM, Nguyen, Duc duc.ngu...@pearson.com wrote: I've also tried setting the aforementioned properties using System.setProperty() as well as on the command line while submitting the job using --conf key=value. All to no success. When I go to the Spark UI and click on that particular streaming job and then the Environment tab, I can see the properties are correctly set. But regardless of what I've tried, the stderr log file on the worker nodes does not roll and continues to grow...leading to a crash of the cluster once it claims 100% of disk. Has anyone else encountered this? Anyone? On Fri, Nov 7, 2014 at 3:35 PM, Nguyen, Duc duc.ngu...@pearson.com wrote: We are running spark streaming jobs (version 1.1.0). After a sufficient amount of time, the stderr file grows until the disk is full at 100% and crashes the cluster. I've read this https://github.com/apache/spark/pull/895 and also read this http://spark.apache.org/docs/latest/configuration.html#spark-streaming So I've tried testing with this in an attempt to get the stderr log file to roll. sparkConf.set(spark.executor.logs.rolling.strategy, size) .set(spark.executor.logs.rolling.size.maxBytes, 1024) .set(spark.executor.logs.rolling.maxRetainedFiles, 3) Yet it does not roll and continues to grow. Am I missing something obvious? thanks, Duc - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark excludes fastutil dependencies we need
No, we should not add fastutil back. It's up to the app to bring dependencies it needs, and that's how I understand this issue. The question is really, how to get the classloader visibility right. It depends on where you need these classes. Have you looked into spark.files.userClassPathFirst and spark.yarn.user.classpath.first ? On Wed, Feb 25, 2015 at 5:34 AM, Ted Yu yuzhih...@gmail.com wrote: bq. depend on missing fastutil classes like Long2LongOpenHashMap Looks like Long2LongOpenHashMap should be added to the shaded jar. Cheers On Tue, Feb 24, 2015 at 7:36 PM, Jim Kleckner j...@cloudphysics.com wrote: Spark includes the clearspring analytics package but intentionally excludes the dependencies of the fastutil package (see below). Spark includes parquet-column which includes fastutil and relocates it under parquet/ but creates a shaded jar file which is incomplete because it shades out some of the fastutil classes, notably Long2LongOpenHashMap, which is present in the fastutil jar file that parquet-column is referencing. We are using more of the clearspring classes (e.g. QDigest) and those do depend on missing fastutil classes like Long2LongOpenHashMap. Even though I add them to our assembly jar file, the class loader finds the spark assembly and we get runtime class loader errors when we try to use it. It is possible to put our jar file first, as described here: https://issues.apache.org/jira/browse/SPARK-939 http://spark.apache.org/docs/1.2.0/configuration.html#runtime-environment which I tried with args to spark-submit: --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true but we still get the class not found error. We have tried copying the source code for clearspring into our package and renaming the package and that makes it appear to work... Is this risky? It certainly is ugly. Can anyone recommend a way to deal with this dependency **ll ? === The spark/pom.xml file contains the following lines: dependency groupIdcom.clearspring.analytics/groupId artifactIdstream/artifactId version2.7.0/version exclusions exclusion groupIdit.unimi.dsi/groupId artifactIdfastutil/artifactId /exclusion /exclusions /dependency === The parquet-column/pom.xml file contains: artifactIdmaven-shade-plugin/artifactId executions execution phasepackage/phase goals goalshade/goal /goals configuration minimizeJartrue/minimizeJar artifactSet includes includeit.unimi.dsi:fastutil/include /includes /artifactSet relocations relocation patternit.unimi.dsi/pattern shadedPatternparquet.it.unimi.dsi/shadedPattern /relocation /relocations /configuration /execution /executions -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-excludes-fastutil-dependencies-we-need-tp21794.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell
I believe your class needs to be defined as a case class (as I answered on SO).. On 25.2.2015. 5:15, anamika gupta wrote: Hi Akhil I guess it skipped my attention. I would definitely give it a try. While I would still like to know what is the issue with the way I have created schema? On Tue, Feb 24, 2015 at 4:35 PM, Akhil Das ak...@sigmoidanalytics.com mailto:ak...@sigmoidanalytics.com wrote: Did you happen to have a look at https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema Thanks Best Regards On Tue, Feb 24, 2015 at 3:39 PM, anu anamika.guo...@gmail.com mailto:anamika.guo...@gmail.com wrote: My issue is posted here on stack-overflow. What am I doing wrong here? http://stackoverflow.com/questions/28689186/facing-error-while-extending-scala-class-with-product-interface-to-overcome-limi View this message in context: Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell http://apache-spark-user-list.1001560.n3.nabble.com/Facing-error-while-extending-scala-class-with-Product-interface-to-overcome-limit-of-22-fields-in-spl-tp21787.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
How to efficiently control concurrent Spark jobs
Hi, Is there a good way (recommended way) to control and run multiple Spark jobs within the same application? My application is like follows; 1) Run one Spark job on a 'ful' dataset, which then creates a few thousands of RDDs containing sub-datasets from the complete dataset. Each of the sub-datasets are independent from the others (the 'ful' dataset is simply a dump from a database containing several different types of records). 2) Run some filtration and manipulations on each of the RDD and finally do some ML on the data. (Each of the created RDD's from step 1) is completely independent so this should be run concurrently). I've implemented this by using Scala Futures and executing the Spark jobs in 2) from a separate thread for each RDD. This works and improves runtime compared to a naive for-loop over step 2). Scaling is however not as good as I would expect it to be. (28 minutes for 4 cores on 1 machine - 19 minutes for 12 cores on 3 machines). Each of the sub-datasets are fairly small so I've used 'repartition' and 'cache' to store the sub-datasets on only one machine in step 1), this improved runtime a few %. So, either do anyone have a suggestion of how to do this in a better way or perhaps if there a higher level workflow tool that I can use on top of Spark? (The cool solution would have been to use nestled RDDs and just map over them in a high level way, but as this is not supported afaik). Thanks! Staffan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-control-concurrent-Spark-jobs-tp21800.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org