Re: akka disassociated on GC
Hi Makoto, I don't remember I wrote that but thanks for bringing this issue up! There are two important settings to check: 1) driver memory (you can see it from the executor tab), 2) number of partitions (try to use small number of partitions). I put two PRs to fix the problem: 1) use broadcast in task closure: https://github.com/apache/spark/pull/1427 2) use treeAggregate to get the result: https://github.com/apache/spark/pull/1110 They are still under review. Once merged, the problem should be fixed. I will test the KDDB dataset and report back. Thanks! Best, Xiangrui On Tue, Jul 15, 2014 at 10:48 PM, Makoto Yui yuin...@gmail.com wrote: Hello, (2014/06/19 23:43), Xiangrui Meng wrote: The execution was slow for more large KDD cup 2012, Track 2 dataset (235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the sequential aggregation of dense vectors on a single driver node. It took about 7.6m for aggregation for an iteration. When running the above test, I got another error at the beginning of the 2nd iteration when enabling iterations. It works fine for the first iteration but the 2nd iteration always fails. It seems that akka connections are suddenly disassociated when GC happens on the driver node. Two possible causes can be considered: 1) The driver is under a heavy load because of GC; so executors cannot connect to the driver. Changing akka timeout setting did not resolve the issue. 2) akka oddly released valid connections on GC. I'm using spark 1.0.1 and timeout setting of akka as follows did not resolve the problem. [spark-defaults.conf] spark.akka.frameSize 50 spark.akka.timeout 120 spark.akka.askTimeout120 spark.akka.lookupTimeout 120 spark.akka.heartbeat.pauses 600 It seems this issue is related to one previously discussed in http://markmail.org/message/p2i34frtf4iusdfn Are there any preferred configurations or workaround for this issue? Thanks, Makoto [The error log of the driver] 14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117 as 25300254 bytes in 35 ms 666.108: [GC [PSYoungGen: 6540914K-975362K(7046784K)] 12419091K-7792529K(23824000K), 5.2157830 secs] [Times: user=0.00 sys=68.43, real=5.22 secs] 14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc09.mydomain.org,34565) 14/07/14 18:11:38 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565) 14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated: app-20140714180032-0010/8 is now EXITED (Command exited with code 1) 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor app-20140714180032-0010/8 removed: Command exited with code 1 14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc30.mydomain.org,59016) 14/07/14 18:11:38 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016) 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 672.596: [GC [PSYoungGen: 6642785K-359202K(6059072K)] 13459952K-8065935K(22836288K), 2.8260220 secs] [Times: user=2.83 sys=33.72, real=2.83 secs] 14/07/14 18:11:41 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278) 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc03.mydomain.org,43278) 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc02.mydomain.org,54538) 14/07/14 18:11:41 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100) 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc18.mydomain.org,58100) 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc18.mydomain.org,58100) The full log is uploaded on https://dl.dropboxusercontent.com/u/13123103/driver.log [The error log of a worker] 14/07/14 18:11:38 INFO worker.Worker: Executor app-20140714180032-0010/8 finished with state EXITED message Command exited with code 1 exitStatus 1 14/07/14 18:11:38 INFO actor.LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.0.1.9%3A60601-39#1322474303] was not delivered. [13] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and
Re: Error when testing with large sparse svm
Then it may be a new issue. Do you mind creating a JIRA to track this issue? It would be great if you can help locate the line in BinaryClassificationMetrics that caused the problem. Thanks! -Xiangrui On Tue, Jul 15, 2014 at 10:56 PM, crater cq...@ucmerced.edu wrote: I don't really have my code, I was just running example program in : examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala What I did was simple try this example on a 13M sparse data, and I got the error I posted. Today I managed to ran it after I commented out the prediction part. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9884.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Error: No space left on device
Hi all, I am encountering the following error: INFO scheduler.TaskSetManager: Loss was due to java.io.IOException: No space left on device [duplicate 4] For each slave, df -h looks roughtly like this, which makes the above error surprising. FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 4.4G 3.5G 57% / tmpfs 7.4G 4.0K 7.4G 1% /dev/shm /dev/xvdb 37G 3.3G 32G 10% /mnt /dev/xvdf 37G 2.0G 34G 6% /mnt2 /dev/xvdv 500G 33M 500G 1% /vol I'm on an EC2 cluster (c3.xlarge + 5 x m3) that I launched using the spark-ec2 scripts and a clone of spark from today. The job I am running closely resembles the collaborative filtering example https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html. This issue happens with the 1M version as well as the 10 million rating version of the MovieLens dataset. I have seen previous http://mail-archives.apache.org/mod_mbox/spark-user/201403.mbox/%3c532f5aec.8060...@nengoiksvelzud.com%3E questions https://groups.google.com/forum/#!msg/spark-users/Axx4optAj-E/q5lWMv-ZqnwJ, but they haven't helped yet. For example, I tried setting the Spark tmp directory to the EBS volume at /vol/, both by editing the spark conf file (and copy-dir'ing it to the slaves) as well as through the SparkConf. Yet I still get the above error. Here is my current Spark config below. Note that I'm launching via ~/spark/bin/spark-submit. conf = SparkConf() conf.setAppName(RecommendALS).set(spark.local.dir, /vol/).set(spark.executor.memory, 7g).set(spark.akka.frameSize, 100).setExecutorEnv(SPARK_JAVA_OPTS, -Dspark.akka.frameSize=100) sc = SparkContext(conf=conf) Thanks for any advice, Chris
Re: Error: No space left on device
Check the number of inodes (df -i). The assembly build may create many small files. -Xiangrui On Tue, Jul 15, 2014 at 11:35 PM, Chris DuBois chris.dub...@gmail.com wrote: Hi all, I am encountering the following error: INFO scheduler.TaskSetManager: Loss was due to java.io.IOException: No space left on device [duplicate 4] For each slave, df -h looks roughtly like this, which makes the above error surprising. FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 4.4G 3.5G 57% / tmpfs 7.4G 4.0K 7.4G 1% /dev/shm /dev/xvdb 37G 3.3G 32G 10% /mnt /dev/xvdf 37G 2.0G 34G 6% /mnt2 /dev/xvdv 500G 33M 500G 1% /vol I'm on an EC2 cluster (c3.xlarge + 5 x m3) that I launched using the spark-ec2 scripts and a clone of spark from today. The job I am running closely resembles the collaborative filtering example. This issue happens with the 1M version as well as the 10 million rating version of the MovieLens dataset. I have seen previous questions, but they haven't helped yet. For example, I tried setting the Spark tmp directory to the EBS volume at /vol/, both by editing the spark conf file (and copy-dir'ing it to the slaves) as well as through the SparkConf. Yet I still get the above error. Here is my current Spark config below. Note that I'm launching via ~/spark/bin/spark-submit. conf = SparkConf() conf.setAppName(RecommendALS).set(spark.local.dir, /vol/).set(spark.executor.memory, 7g).set(spark.akka.frameSize, 100).setExecutorEnv(SPARK_JAVA_OPTS, -Dspark.akka.frameSize=100) sc = SparkContext(conf=conf) Thanks for any advice, Chris
Re: Error: No space left on device
df -i # on a slave FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 277701 246587 53% / tmpfs1917974 1 19179731% /dev/shm On Tue, Jul 15, 2014 at 11:39 PM, Xiangrui Meng men...@gmail.com wrote: Check the number of inodes (df -i). The assembly build may create many small files. -Xiangrui On Tue, Jul 15, 2014 at 11:35 PM, Chris DuBois chris.dub...@gmail.com wrote: Hi all, I am encountering the following error: INFO scheduler.TaskSetManager: Loss was due to java.io.IOException: No space left on device [duplicate 4] For each slave, df -h looks roughtly like this, which makes the above error surprising. FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 4.4G 3.5G 57% / tmpfs 7.4G 4.0K 7.4G 1% /dev/shm /dev/xvdb 37G 3.3G 32G 10% /mnt /dev/xvdf 37G 2.0G 34G 6% /mnt2 /dev/xvdv 500G 33M 500G 1% /vol I'm on an EC2 cluster (c3.xlarge + 5 x m3) that I launched using the spark-ec2 scripts and a clone of spark from today. The job I am running closely resembles the collaborative filtering example. This issue happens with the 1M version as well as the 10 million rating version of the MovieLens dataset. I have seen previous questions, but they haven't helped yet. For example, I tried setting the Spark tmp directory to the EBS volume at /vol/, both by editing the spark conf file (and copy-dir'ing it to the slaves) as well as through the SparkConf. Yet I still get the above error. Here is my current Spark config below. Note that I'm launching via ~/spark/bin/spark-submit. conf = SparkConf() conf.setAppName(RecommendALS).set(spark.local.dir, /vol/).set(spark.executor.memory, 7g).set(spark.akka.frameSize, 100).setExecutorEnv(SPARK_JAVA_OPTS, -Dspark.akka.frameSize=100) sc = SparkContext(conf=conf) Thanks for any advice, Chris
Re: Error: No space left on device
Hi Chris, I've encountered this error when running Spark’s ALS methods too. In my case, it was because I set spark.local.dir improperly, and every time there was a shuffle, it would spill many GB of data onto the local drive. What fixed it was setting it to use the /mnt directory, where a network drive is mounted. For example, setting an environmental variable: export SPACE=$(mount | grep mnt | awk '{print $3/spark/}' | xargs | sed 's/ /,/g’) Then adding -Dspark.local.dir=$SPACE or simply -Dspark.local.dir=/mnt/spark/,/mnt2/spark/ when you run your driver application Chris On Jul 15, 2014, at 11:39 PM, Xiangrui Meng men...@gmail.com wrote: Check the number of inodes (df -i). The assembly build may create many small files. -Xiangrui On Tue, Jul 15, 2014 at 11:35 PM, Chris DuBois chris.dub...@gmail.com wrote: Hi all, I am encountering the following error: INFO scheduler.TaskSetManager: Loss was due to java.io.IOException: No space left on device [duplicate 4] For each slave, df -h looks roughtly like this, which makes the above error surprising. FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 4.4G 3.5G 57% / tmpfs 7.4G 4.0K 7.4G 1% /dev/shm /dev/xvdb 37G 3.3G 32G 10% /mnt /dev/xvdf 37G 2.0G 34G 6% /mnt2 /dev/xvdv 500G 33M 500G 1% /vol I'm on an EC2 cluster (c3.xlarge + 5 x m3) that I launched using the spark-ec2 scripts and a clone of spark from today. The job I am running closely resembles the collaborative filtering example. This issue happens with the 1M version as well as the 10 million rating version of the MovieLens dataset. I have seen previous questions, but they haven't helped yet. For example, I tried setting the Spark tmp directory to the EBS volume at /vol/, both by editing the spark conf file (and copy-dir'ing it to the slaves) as well as through the SparkConf. Yet I still get the above error. Here is my current Spark config below. Note that I'm launching via ~/spark/bin/spark-submit. conf = SparkConf() conf.setAppName(RecommendALS).set(spark.local.dir, /vol/).set(spark.executor.memory, 7g).set(spark.akka.frameSize, 100).setExecutorEnv(SPARK_JAVA_OPTS, -Dspark.akka.frameSize=100) sc = SparkContext(conf=conf) Thanks for any advice, Chris
Re: Ambiguous references to id : what does it mean ?
My query is just a simple query that use the spark sql dsl : tagCollection.join(selectedVideos).where('videoId === 'id) On Tue, Jul 15, 2014 at 6:03 PM, Yin Huai huaiyin@gmail.com wrote: Hi Jao, Seems the SQL analyzer cannot resolve the references in the Join condition. What is your query? Did you use the Hive Parser (your query was submitted through hql(...)) or the basic SQL Parser (your query was submitted through sql(...)). Thanks, Yin On Tue, Jul 15, 2014 at 8:52 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, When running a join operation with Spark SQL I got the following error : Exception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Ambiguous references to id: (id#303,List()),(id#0,List()), tree: Filter ('videoId = 'id) Join Inner, None ParquetRelation data/tags.parquet Filter (name#1 = P1/cam1) ParquetRelation data/videos.parquet What does it mean ? Cheers, jao
Re: Error: No space left on device
Thanks for the quick responses! I used your final -Dspark.local.dir suggestion, but I see this during the initialization of the application: 14/07/16 06:56:08 INFO storage.DiskBlockManager: Created local directory at /vol/spark-local-20140716065608-7b2a I would have expected something in /mnt/spark/. Thanks, Chris On Tue, Jul 15, 2014 at 11:44 PM, Chris Gore cdg...@cdgore.com wrote: Hi Chris, I've encountered this error when running Spark’s ALS methods too. In my case, it was because I set spark.local.dir improperly, and every time there was a shuffle, it would spill many GB of data onto the local drive. What fixed it was setting it to use the /mnt directory, where a network drive is mounted. For example, setting an environmental variable: export SPACE=$(mount | grep mnt | awk '{print $3/spark/}' | xargs | sed 's/ /,/g’) Then adding -Dspark.local.dir=$SPACE or simply -Dspark.local.dir=/mnt/spark/,/mnt2/spark/ when you run your driver application Chris On Jul 15, 2014, at 11:39 PM, Xiangrui Meng men...@gmail.com wrote: Check the number of inodes (df -i). The assembly build may create many small files. -Xiangrui On Tue, Jul 15, 2014 at 11:35 PM, Chris DuBois chris.dub...@gmail.com wrote: Hi all, I am encountering the following error: INFO scheduler.TaskSetManager: Loss was due to java.io.IOException: No space left on device [duplicate 4] For each slave, df -h looks roughtly like this, which makes the above error surprising. FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 4.4G 3.5G 57% / tmpfs 7.4G 4.0K 7.4G 1% /dev/shm /dev/xvdb 37G 3.3G 32G 10% /mnt /dev/xvdf 37G 2.0G 34G 6% /mnt2 /dev/xvdv 500G 33M 500G 1% /vol I'm on an EC2 cluster (c3.xlarge + 5 x m3) that I launched using the spark-ec2 scripts and a clone of spark from today. The job I am running closely resembles the collaborative filtering example. This issue happens with the 1M version as well as the 10 million rating version of the MovieLens dataset. I have seen previous questions, but they haven't helped yet. For example, I tried setting the Spark tmp directory to the EBS volume at /vol/, both by editing the spark conf file (and copy-dir'ing it to the slaves) as well as through the SparkConf. Yet I still get the above error. Here is my current Spark config below. Note that I'm launching via ~/spark/bin/spark-submit. conf = SparkConf() conf.setAppName(RecommendALS).set(spark.local.dir, /vol/).set(spark.executor.memory, 7g).set(spark.akka.frameSize, 100).setExecutorEnv(SPARK_JAVA_OPTS, -Dspark.akka.frameSize=100) sc = SparkContext(conf=conf) Thanks for any advice, Chris
Re: akka disassociated on GC
Hi Xiangrui, (2014/07/16 15:05), Xiangrui Meng wrote: I don't remember I wrote that but thanks for bringing this issue up! There are two important settings to check: 1) driver memory (you can see it from the executor tab), 2) number of partitions (try to use small number of partitions). I put two PRs to fix the problem: For the driver memory, I used 16GB/24GB and it was enough for the execution (full GC was not happen). I check it by using jmap and top command. BTW, I was faced that the required memory for driver was oddly proportional to # of tasks/executors. When I used 8GB for the driver memory, I got OOM in the task serialization. It could be considered as a possible memory leak in the task serialization to be addressed in the future. Each task size is about 24MB and # of tasks/executors is 280. The size of each task result was about 120MB or so. 1) use broadcast in task closure: https://github.com/apache/spark/pull/1427 Does this PR reduce the required memory for the driver? Is there a big difference in explicit broadcast of feature weights and implicit task serialization including feature weights? 2) use treeAggregate to get the result: https://github.com/apache/spark/pull/1110 treeAggregate would reduce the time for aggregation and the required memory of a driver for sure. I would test it. However, the problem that I am facing now is an akka connection issue on GC, or under heavy loads. And thus, I think the problem is lurking behind even though the consumed memory size is reduced by treeAggregate. Best, Makoto
Re: How does Spark speculation prevent duplicated work?
That makes sense. Thanks everyone for the explanations! Mingyu From: Matei Zaharia matei.zaha...@gmail.com Reply-To: user@spark.apache.org user@spark.apache.org Date: Tuesday, July 15, 2014 at 3:00 PM To: user@spark.apache.org user@spark.apache.org Subject: Re: How does Spark speculation prevent duplicated work? Yeah, this is handled by the commit call of the FileOutputFormat. In general Hadoop OutputFormats have a concept called committing the output, which you should do only once per partition. In the file ones it does an atomic rename to make sure that the final output is a complete file. Matei On Jul 15, 2014, at 2:49 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The way the HDFS file writing works at a high level is that each attempt to write a partition to a file starts writing to unique temporary file (say, something like targetDirectory/_temp/part-X_attempt-). If the writing into the file successfully completes, then the temporary file is moved to the final location (say, targetDirectory/part-X). If, due to speculative execution, the file already exists in the final intended location, then move is avoided. Or, its overwritten, I forget the implementation. Either ways, all attempts to write the same partition, will always write the same data to the temp file (assuming the spark transformation generating the data is deterministic and idempotent). And once one attempt is successful, the final file will have the same data. Hence, writing to HDFS / S3 is idempotent. Now this logic is already implemented within the Hadoop's MapReduce logic, and Spark just uses it directly. TD On Tue, Jul 15, 2014 at 2:33 PM, Mingyu Kim m...@palantir.com wrote: Thanks for the explanation, guys. I looked into the saveAsHadoopFile implementation a little bit. If you see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/sp ark/rdd/PairRDDFunctions.scala https://urldefense.proofpoint.com/v1/url?u=https://github.com/apache/spark/b lob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scalak= fDZpZZQMmYwf27OU23GmAQ%3D%3D%0Ar=UKDOcu6qL3KsoZhpOohNBR1ucPNmWnbd3eEJ9hVUdMk %3D%0Am=Sb74h34ZToCtFlhH6q91HplG%2FXaCtRoAmwWFXD9vXI0%3D%0As=a68ed701b6f285 5cc2fb0aaec8d033cd6ef9bafbb2a91ce7a10e465e79d0a4d2 at line 843, the HDFS write happens at per-partition processing, not at the result handling, so I have a feeling that it might be writing multiple times. This may be fine if both tasks for the same partition completes because it will simply overwrite the output partition with the same content, but this could be an issue if one of the tasks completes and the other is in the middle of writing the partition by the time the entire stage completes. Can someone explain this? Bertrand, I¹m slightly confused about your comment. So, is it the case that HDFS will handle the writes as a temp file write followed by an atomic move, so the concern I had above is handled at the HDFS level? Mingyu From: Bertrand Dechoux decho...@gmail.com Reply-To: user@spark.apache.org user@spark.apache.org Date: Tuesday, July 15, 2014 at 1:22 PM To: user@spark.apache.org user@spark.apache.org Subject: Re: How does Spark speculation prevent duplicated work? I haven't look at the implementation but what you would do with any filesystem is write to a file inside the workspace directory of the task. And then only the attempt of the task that should be kept will perform a move to the final path. The other attempts are simply discarded. For most filesystem (and that's the case for HDFS), a 'move' is a very simple and fast action because only the full path/name of the file change but not its content or where this content is physically stored. Executive speculation happens in Hadoop MapReduce. Spark has the same concept. As long as you apply functions with no side effect (ie the only impact is the returned results), then you just need to not take into account results from additional attempts of the same task/operator. Bertrand Dechoux On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash and...@andrewash.com wrote: Hi Nan, Great digging in -- that makes sense to me for when a job is producing some output handled by Spark like a .count or .distinct or similar. For the other part of the question, I'm also interested in side effects like an HDFS disk write. If one task is writing to an HDFS path and another task starts up, wouldn't it also attempt to write to the same path? How is that de-conflicted? On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, Mingyuan, According to my understanding, Spark processes the result generated from each partition by passing them to resultHandler (SparkContext.scala L1056) This resultHandler is usually just put the result in a driver-side array, the length of which is always partitions.size this design effectively ensures that the actions are idempotent
How does Apache Spark handles system failure when deployed in YARN?
Hello @ the mailing list, We think of using spark in one of our projects in a Hadoop cluster. During evaluation several questions remain which are stated below. Preconditions Let's assume Apache Spark is deployed on a hadoop cluster using YARN. Furthermore a spark execution is running. How does spark handle the situations listed below? Cases Questions 1. One node of the hadoop clusters fails due to a disc error. However replication is high enough and no data was lost. *What will happen to tasks that where running at that node? 2. One node of the hadoop clusters fails due to a disc error. Replication was not high enough and data was lost. Simply spark couldn't find a file anymore which was pre-configured as resource for the work flow. *How will it handle this situation? 3. During execution the primary namenode fails over. *Did spark automatically use the fail over namenode? *What happens when the secondary namenode fails as well? 4. For some reasons during a work flow the cluster is totally shut down. *Will spark restart with the cluster automatically? *Will it resume to the last save point during the work flow? Thanks in advance. :) Best regards Matthias Kricke
Re: Error: No space left on device
Hi Chris, Could you also try `df -i` on the master node? How many blocks/partitions did you set? In the current implementation, ALS doesn't clean the shuffle data because the operations are chained together. But it shouldn't run out of disk space on the MovieLens dataset, which is small. spark-ec2 script sets /mnt/spark and /mnt/spark2 as the local.dir by default, I would recommend leaving this setting as the default value. Best, Xiangrui On Wed, Jul 16, 2014 at 12:02 AM, Chris DuBois chris.dub...@gmail.com wrote: Thanks for the quick responses! I used your final -Dspark.local.dir suggestion, but I see this during the initialization of the application: 14/07/16 06:56:08 INFO storage.DiskBlockManager: Created local directory at /vol/spark-local-20140716065608-7b2a I would have expected something in /mnt/spark/. Thanks, Chris On Tue, Jul 15, 2014 at 11:44 PM, Chris Gore cdg...@cdgore.com wrote: Hi Chris, I've encountered this error when running Spark’s ALS methods too. In my case, it was because I set spark.local.dir improperly, and every time there was a shuffle, it would spill many GB of data onto the local drive. What fixed it was setting it to use the /mnt directory, where a network drive is mounted. For example, setting an environmental variable: export SPACE=$(mount | grep mnt | awk '{print $3/spark/}' | xargs | sed 's/ /,/g’) Then adding -Dspark.local.dir=$SPACE or simply -Dspark.local.dir=/mnt/spark/,/mnt2/spark/ when you run your driver application Chris On Jul 15, 2014, at 11:39 PM, Xiangrui Meng men...@gmail.com wrote: Check the number of inodes (df -i). The assembly build may create many small files. -Xiangrui On Tue, Jul 15, 2014 at 11:35 PM, Chris DuBois chris.dub...@gmail.com wrote: Hi all, I am encountering the following error: INFO scheduler.TaskSetManager: Loss was due to java.io.IOException: No space left on device [duplicate 4] For each slave, df -h looks roughtly like this, which makes the above error surprising. FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 4.4G 3.5G 57% / tmpfs 7.4G 4.0K 7.4G 1% /dev/shm /dev/xvdb 37G 3.3G 32G 10% /mnt /dev/xvdf 37G 2.0G 34G 6% /mnt2 /dev/xvdv 500G 33M 500G 1% /vol I'm on an EC2 cluster (c3.xlarge + 5 x m3) that I launched using the spark-ec2 scripts and a clone of spark from today. The job I am running closely resembles the collaborative filtering example. This issue happens with the 1M version as well as the 10 million rating version of the MovieLens dataset. I have seen previous questions, but they haven't helped yet. For example, I tried setting the Spark tmp directory to the EBS volume at /vol/, both by editing the spark conf file (and copy-dir'ing it to the slaves) as well as through the SparkConf. Yet I still get the above error. Here is my current Spark config below. Note that I'm launching via ~/spark/bin/spark-submit. conf = SparkConf() conf.setAppName(RecommendALS).set(spark.local.dir, /vol/).set(spark.executor.memory, 7g).set(spark.akka.frameSize, 100).setExecutorEnv(SPARK_JAVA_OPTS, -Dspark.akka.frameSize=100) sc = SparkContext(conf=conf) Thanks for any advice, Chris
Re: Kyro deserialisation error
Thanks for your reply. The SparkContext is configured as below: sparkConf.setAppName(WikipediaPageRank) sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) sparkConf.set(spark.kryo.registrator, classOf[PRKryoRegistrator].getName) val inputFile = args(0) val threshold = args(1).toDouble val numPartitions = args(2).toInt val usePartitioner = args(3).toBoolean sparkConf.setAppName(WikipediaPageRank) sparkConf.set(spark.executor.memory, 60g) sparkConf.set(spark.cores.max, 48) sparkConf.set(spark.kryoserializer.buffer.mb, 24) val sc = new SparkContext(sparkConf) sc.addJar(~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar) And I use spark-submit to run the application: ./bin/spark-submit --master spark://sing12:7077 --total-executor-cores 40 --executor-memory 40g --class org.apache.spark.examples.bagel.WikipediaPageRank ~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar hdfs://192.168.1.12:9000/freebase-26G 1 200 True Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Wed, Jul 16, 2014 at 1:41 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using classes from external libraries that have not been added to the sparkContext, using sparkcontext.addJar()? TD On Tue, Jul 15, 2014 at 8:36 PM, Hao Wang wh.s...@gmail.com wrote: I am running the WikipediaPageRank in Spark example and share the same problem with you: 4/07/16 11:31:06 DEBUG DAGScheduler: submitStage(Stage 6) 14/07/16 11:31:06 ERROR TaskSetManager: Task 6.0:450 failed 4 times; aborting job 14/07/16 11:31:06 INFO DAGScheduler: Failed to run foreach at Bagel.scala:251 Exception in thread main 14/07/16 11:31:06 INFO TaskSchedulerImpl: Cancelling stage 6 org.apache.spark.SparkException: Job aborted due to stage failure: Task 6.0:450 failed 4 times, most recent failure: Exception failure in TID 1330 on host sing11: com.esotericsoftware.kryo.KryoException: Unable to find class: arl Fridtjof Rode com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) com.twitter.chill.TraversableSerializer.read(Traversable.scala:44) com.twitter.chill.TraversableSerializer.read(Traversable.scala:21) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:115) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) Anyone cloud help? Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Tue, Jun 3, 2014 at 8:02 PM, Denes te...@outlook.com wrote: I tried to use Kryo as a serialiser isn spark streaming, did everything according to the guide posted on the spark website, i.e. added the following lines: conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.kryo.registrator, MyKryoRegistrator); I also added the necessary classes to the MyKryoRegistrator. However I get the following strange error, can someone help me out where to look for a solution? 14/06/03 09:00:49 ERROR scheduler.JobScheduler: Error running job streaming job 140177880 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Exception while deserializing and fetching task: com.esotericsoftware.kryo.KryoException: Unable to find class: J Serialization trace: id (org.apache.spark.storage.GetBlock) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at
Re: How does Apache Spark handles system failure when deployed in YARN?
Hi Matthias, Answers inline. -Sandy On Wed, Jul 16, 2014 at 12:21 AM, Matthias Kricke matthias.kri...@mgm-tp.com wrote: Hello @ the mailing list, We think of using spark in one of our projects in a Hadoop cluster. During evaluation several questions remain which are stated below. *Preconditions* Let's assume Apache Spark is deployed on a hadoop cluster using YARN. Furthermore a spark execution is running. How does spark handle the situations listed below? *Cases Questions* 1. One node of the hadoop clusters fails due to a disc error. However replication is high enough and no data was lost. ·*What will happen to tasks that where running at that node?* Spark will rerun those tasks on a different node. 2. One node of the hadoop clusters fails due to a disc error. Replication was *not* high enough and data was lost. Simply spark couldn't find a file anymore which was pre-configured as resource for the work flow. ·*How will it handle this situation?* After a number of failed task attempts trying to read the block, Spark would pass up whatever error HDFS is returning and fail the job. 3. During execution the primary namenode fails over. ·*Did spark automatically use the fail over namenode?* ·*What happens when the secondary namenode fails as well?* Spark accesses HDFS through the normal HDFS client APIs. Under an HA configuration, these will automatically fail over to the new namenode. If no namenodes are left, the Spark job will fail. 4. For some reasons during a work flow the cluster is totally shut down. ·*Will spark restart with the cluster automatically?* ·*Will it resume to the last save point during the work flow?* Can you elaborate a little more on what you mean by the cluster is totally shut down? Do you mean HDFS becomes inaccessible or all the nodes in the cluster simultaneously lose power? Spark has support for checkpointing to HDFS, so you would be able to go back to the last time checkpoint was called that HDFS was available. Thanks in advance. :) Best regards Matthias Kricke
AW: How does Apache Spark handles system failure when deployed in YARN?
Thanks, your answers totally cover all my questions ☺ Von: Sandy Ryza [mailto:sandy.r...@cloudera.com] Gesendet: Mittwoch, 16. Juli 2014 09:41 An: user@spark.apache.org Betreff: Re: How does Apache Spark handles system failure when deployed in YARN? Hi Matthias, Answers inline. -Sandy On Wed, Jul 16, 2014 at 12:21 AM, Matthias Kricke matthias.kri...@mgm-tp.commailto:matthias.kri...@mgm-tp.com wrote: Hello @ the mailing list, We think of using spark in one of our projects in a Hadoop cluster. During evaluation several questions remain which are stated below. Preconditions Let's assume Apache Spark is deployed on a hadoop cluster using YARN. Furthermore a spark execution is running. How does spark handle the situations listed below? Cases Questions 1. One node of the hadoop clusters fails due to a disc error. However replication is high enough and no data was lost. •What will happen to tasks that where running at that node? Spark will rerun those tasks on a different node. 2. One node of the hadoop clusters fails due to a disc error. Replication was not high enough and data was lost. Simply spark couldn't find a file anymore which was pre-configured as resource for the work flow. •How will it handle this situation? After a number of failed task attempts trying to read the block, Spark would pass up whatever error HDFS is returning and fail the job. 3. During execution the primary namenode fails over. •Did spark automatically use the fail over namenode? •What happens when the secondary namenode fails as well? Spark accesses HDFS through the normal HDFS client APIs. Under an HA configuration, these will automatically fail over to the new namenode. If no namenodes are left, the Spark job will fail. 4. For some reasons during a work flow the cluster is totally shut down. •Will spark restart with the cluster automatically? •Will it resume to the last save point during the work flow? Can you elaborate a little more on what you mean by the cluster is totally shut down? Do you mean HDFS becomes inaccessible or all the nodes in the cluster simultaneously lose power? Spark has support for checkpointing to HDFS, so you would be able to go back to the last time checkpoint was called that HDFS was available. Thanks in advance. :) Best regards Matthias Kricke
Spark Streaming, external windowing?
Does anyone here have a way to do Spark Streaming with external timing for windows? Right now, it relies on the wall clock of the driver to determine the amount of time that each batch read lasts. We have a Kafka, and HDFS ingress into our Spark Streaming pipeline where the events are annotated by the timestamps that they happened (in real time) in. We would like to keep our windows based on those timestamps, as opposed to based on the driver time. Does anyone have any ideas how to do this?
Re: Error: No space left on device
Hi Xiangrui, Here is the result on the master node: $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 273997 250291 53% / tmpfs1917974 1 19179731% /dev/shm /dev/xvdv524288000 30 5242879701% /vol I have reproduced the error while using the MovieLens 10M data set on a newly created cluster. Thanks for the help. Chris On Wed, Jul 16, 2014 at 12:22 AM, Xiangrui Meng men...@gmail.com wrote: Hi Chris, Could you also try `df -i` on the master node? How many blocks/partitions did you set? In the current implementation, ALS doesn't clean the shuffle data because the operations are chained together. But it shouldn't run out of disk space on the MovieLens dataset, which is small. spark-ec2 script sets /mnt/spark and /mnt/spark2 as the local.dir by default, I would recommend leaving this setting as the default value. Best, Xiangrui On Wed, Jul 16, 2014 at 12:02 AM, Chris DuBois chris.dub...@gmail.com wrote: Thanks for the quick responses! I used your final -Dspark.local.dir suggestion, but I see this during the initialization of the application: 14/07/16 06:56:08 INFO storage.DiskBlockManager: Created local directory at /vol/spark-local-20140716065608-7b2a I would have expected something in /mnt/spark/. Thanks, Chris On Tue, Jul 15, 2014 at 11:44 PM, Chris Gore cdg...@cdgore.com wrote: Hi Chris, I've encountered this error when running Spark’s ALS methods too. In my case, it was because I set spark.local.dir improperly, and every time there was a shuffle, it would spill many GB of data onto the local drive. What fixed it was setting it to use the /mnt directory, where a network drive is mounted. For example, setting an environmental variable: export SPACE=$(mount | grep mnt | awk '{print $3/spark/}' | xargs | sed 's/ /,/g’) Then adding -Dspark.local.dir=$SPACE or simply -Dspark.local.dir=/mnt/spark/,/mnt2/spark/ when you run your driver application Chris On Jul 15, 2014, at 11:39 PM, Xiangrui Meng men...@gmail.com wrote: Check the number of inodes (df -i). The assembly build may create many small files. -Xiangrui On Tue, Jul 15, 2014 at 11:35 PM, Chris DuBois chris.dub...@gmail.com wrote: Hi all, I am encountering the following error: INFO scheduler.TaskSetManager: Loss was due to java.io.IOException: No space left on device [duplicate 4] For each slave, df -h looks roughtly like this, which makes the above error surprising. FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 4.4G 3.5G 57% / tmpfs 7.4G 4.0K 7.4G 1% /dev/shm /dev/xvdb 37G 3.3G 32G 10% /mnt /dev/xvdf 37G 2.0G 34G 6% /mnt2 /dev/xvdv 500G 33M 500G 1% /vol I'm on an EC2 cluster (c3.xlarge + 5 x m3) that I launched using the spark-ec2 scripts and a clone of spark from today. The job I am running closely resembles the collaborative filtering example. This issue happens with the 1M version as well as the 10 million rating version of the MovieLens dataset. I have seen previous questions, but they haven't helped yet. For example, I tried setting the Spark tmp directory to the EBS volume at /vol/, both by editing the spark conf file (and copy-dir'ing it to the slaves) as well as through the SparkConf. Yet I still get the above error. Here is my current Spark config below. Note that I'm launching via ~/spark/bin/spark-submit. conf = SparkConf() conf.setAppName(RecommendALS).set(spark.local.dir, /vol/).set(spark.executor.memory, 7g).set(spark.akka.frameSize, 100).setExecutorEnv(SPARK_JAVA_OPTS, -Dspark.akka.frameSize=100) sc = SparkContext(conf=conf) Thanks for any advice, Chris
Re: Spark Streaming, external windowing?
Hi Sargun, There have been few discussions on the list recently about the topic. The short answer is that this is not supported at the moment. This is a particularly good thread as it discusses the current state and limitations: http://apache-spark-developers-list.1001551.n3.nabble.com/brainsotrming-Generalization-of-DStream-a-ContinuousRDD-td7349.html -kr, Gerard. On Wed, Jul 16, 2014 at 9:56 AM, Sargun Dhillon sar...@sargun.me wrote: Does anyone here have a way to do Spark Streaming with external timing for windows? Right now, it relies on the wall clock of the driver to determine the amount of time that each batch read lasts. We have a Kafka, and HDFS ingress into our Spark Streaming pipeline where the events are annotated by the timestamps that they happened (in real time) in. We would like to keep our windows based on those timestamps, as opposed to based on the driver time. Does anyone have any ideas how to do this?
RE: executor-cores vs. num-executors
Thanks. Really, now I compare a stage data of the two jobs, ‘core7-exec3’ spends about 12.5 minutes more than ‘core2-exec12’ on GC. From: Nishkam Ravi [mailto:nr...@cloudera.com] Sent: Wednesday, July 16, 2014 5:28 PM To: user@spark.apache.org Subject: Re: executor-cores vs. num-executors I think two small JVMs would often beat a large one due to lower GC overhead.
Re: Need help on spark Hbase
Hi Team, Now i've changed my code and reading configuration from hbase-site.xml file(this file is in classpath). When i run this program using : mvn exec:java -Dexec.mainClass=com.cisco.ana.accessavailability.AccessAvailability. It is working fine. But when i run this program from spark-submit i'm getting below exception Please find below exception : spark-submit command not able to found the HbaseConfiguration. How to resolve this issue? rajesh@rajesh-VirtualBox:~/Downloads/spark-1.0.0$ ./bin/spark-submit --master local --class com.cisco.ana.accessavailability.AccessAvailability --jars /home/rajesh/Downloads/MISC/ANA_Access/target/ANA_Access-0.0.1-SNAPSHOT.jar, /home/rajesh/hbase-0.96.1.1-hadoop2/lib/hbase-client-0.96.1.1-hadoop2.jar, /home/rajesh/hbase-0.96.1.1-hadoop2/lib/hbase-common-0.96.1.1-hadoop2.jar, /home/rajesh/hbase-0.96.1.1-hadoop2/lib/hbase-hadoop2-compat-0.96.1.1-hadoop2.jar, /home/rajesh/hbase-0.96.1.1-hadoop2/lib/hbase-it-0.96.1.1-hadoop2.jar, /home/rajesh/hbase-0.96.1.1-hadoop2/lib/hbase-protocol-0.96.1.1-hadoop2.jar, /home/rajesh/hbase-0.96.1.1-hadoop2/lib/hbase-server-0.96.1.1-hadoop2.jar, /home/rajesh/hbase-0.96.1.1-hadoop2/lib/htrace-core-2.01.jar, /home/rajesh/Downloads/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop2.2.0.jar Warning: Local jar /home/rajesh/hbase-0.96.1.1-hadoop2/lib/hbase-client-0.96.1.1-hadoop2.jar, does not exist, skipping. Before *Exception in thread main java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration* at com.cisco.ana.accessavailability.AccessAvailability.main(AccessAvailability.java:80) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.HBaseConfiguration at java.net.URLClassLoader$1.run(URLClassLoader.java:217) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:323) at java.lang.ClassLoader.loadClass(ClassLoader.java:268) ... 8 more Please find below code : public class AccessAvailability { public static void main(String[] args) throws Exception { System.out.println( Before); Configuration configuration = HBaseConfiguration.create(); System.out.println( After); SparkConf s = new SparkConf().setMaster(local); JavaStreamingContext ssc = new JavaStreamingContext(master,AccessAvailability, new Duration(4), sparkHome, ); JavaDStreamString lines_2 = ssc.textFileStream(hdfsfolderpath); } } Regards, Rajesh On Wed, Jul 16, 2014 at 5:39 AM, Krishna Sankar ksanka...@gmail.com wrote: Good catch. I thought the largest port number is 65535. Cheers k/ On Tue, Jul 15, 2014 at 4:33 PM, Spark DevUser spark.devu...@gmail.com wrote: Are you able to launch *hbase shell* and run some commands (list, describe, scan, etc)? Seems *configuration.set(hbase.**master, localhost:60)* is wrong. On Tue, Jul 15, 2014 at 3:00 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Also, it helps if you post us logs, stacktraces, exceptions, etc. TD On Tue, Jul 15, 2014 at 10:07 AM, Jerry Lam chiling...@gmail.com wrote: Hi Rajesh, I have a feeling that this is not directly related to spark but I might be wrong. The reason why is that when you do: Configuration configuration = HBaseConfiguration.create(); by default, it reads the configuration files hbase-site.xml in your classpath and ... (I don't remember all the configuration files hbase has). I noticed that you overwrote some configuration settings in the code but I'm not if you have other configurations that might have conflicted with those. Could you try the following, remove anything that is spark specific leaving only hbase related codes. uber jar it and run it just like any other simple java program. If you still have connection issues, then at least you know the problem is from the configurations. HTH, Jerry On Tue, Jul 15, 2014 at 12:10 PM, Krishna Sankar ksanka...@gmail.com wrote: One vector to check is the HBase libraries in the --jars as in : spark-submit --class your class --master master url --jars hbase-client-0.98.3-hadoop2.jar,commons-csv-1.0-SNAPSHOT.jar,hbase-common-0.98.3-hadoop2.jar,hbase-hadoop2-compat-0.98.3-hadoop2.jar,hbase-it-0.98.3-hadoop2.jar,hbase-protocol-0.98.3-hadoop2.jar,hbase-server-0.98.3-hadoop2.jar,htrace-core-2.04.jar,spark-assembly-1.0.0-hadoop2.2.0.jar
Server IPC version 7 cannot communicate with client version 4 with Spark Streaming 1.0.0 in Java and CH4 quickstart in local mode
Hi, I'm running a Java program using Spark Streaming 1.0.0 on Cloudera 4.4.0 quickstart virtual machine, with hadoop-client 2.0.0-mr1-cdh4.4.0, which is the one corresponding to my Hadoop distribution, and that works with other mapreduce programs, and with the maven property hadoop.version2.0.0-mr1-cdh4.4.0/hadoop.version configured according to http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html. When I set jssc.checkpoint(hdfs://localhost:8020/user/cloudera/bicing/streaming_checkpoints); I get a Server IPC version 7 cannot communicate with client version 4 running the program in local mode using local[4] as master. I have seen this problem before in other forums like http://qnalist.com/questions/4957822/hdfs-server-client-ipc-version-mismatch-while-trying-to-access-hdfs-files-using-spark-0-9-1 or http://comments.gmane.org/gmane.comp.lang.scala.spark.user/106 but the solution is basically setting the property I have already set. I have tried also with hadoop-version2.0.0-cdh4.4.0/hadoop-version and hadoop.major.version2.0/hadoop.major.version with no luck. Could someone help me with this? Thanks a lot in advance Greetings, Juan
Re: parallel stages?
Yes, but what I show can be done in one Spark job. On Wed, Jul 16, 2014 at 5:01 AM, Wei Tan w...@us.ibm.com wrote: Thanks Sean. In Oozie you can use fork-join, however using Oozie to drive Spark jobs, jobs will not be able to share RDD (Am I right? I think multiple jobs submitted by Oozie will have different context). Wonder if Spark wants to add more workflow feature in future.
Reading file header in Spark
Hi everyone! I'm really new to Spark and I'm trying to figure out which would be the proper way to do the following: 1.- Read a file header (a single line) 2.- Build with it a configuration object 3.- Use that object in a function that will be called by map() I thought about using filter() after textFile(), but I don't want to get an RDD as result for I'm expecting a unique object. Any help is very appreciated. Thanks in advance, Silvina
Re: Reading file header in Spark
You can rdd.take(1) to get just the header line. I think someone mentioned before that this is a good use case for having a tail method on RDDs too, to skip the header for subsequent processing. But you can ignore it with a filter, or logic in your map method. On Wed, Jul 16, 2014 at 11:01 AM, Silvina Caíno Lores silvi.ca...@gmail.com wrote: Hi everyone! I'm really new to Spark and I'm trying to figure out which would be the proper way to do the following: 1.- Read a file header (a single line) 2.- Build with it a configuration object 3.- Use that object in a function that will be called by map() I thought about using filter() after textFile(), but I don't want to get an RDD as result for I'm expecting a unique object. Any help is very appreciated. Thanks in advance, Silvina
Re: Reading file header in Spark
Thank you! This is what I needed, I've read it should work as the first() method as well. It's a pity that the taken element cannot be removed from the RDD though. Thanks again! On 16 July 2014 12:09, Sean Owen so...@cloudera.com wrote: You can rdd.take(1) to get just the header line. I think someone mentioned before that this is a good use case for having a tail method on RDDs too, to skip the header for subsequent processing. But you can ignore it with a filter, or logic in your map method. On Wed, Jul 16, 2014 at 11:01 AM, Silvina Caíno Lores silvi.ca...@gmail.com wrote: Hi everyone! I'm really new to Spark and I'm trying to figure out which would be the proper way to do the following: 1.- Read a file header (a single line) 2.- Build with it a configuration object 3.- Use that object in a function that will be called by map() I thought about using filter() after textFile(), but I don't want to get an RDD as result for I'm expecting a unique object. Any help is very appreciated. Thanks in advance, Silvina
Re: Server IPC version 7 cannot communicate with client version 4 with Spark Streaming 1.0.0 in Java and CH4 quickstart in local mode
Server IPC version 7 cannot communicate with client version 4 means your client is Hadoop 1.x and your cluster is Hadoop 2.x. The default Spark distribution is built for Hadoop 1.x. You would have to make your own build (or, use the artifacts distributed for CDH4.6 maybe? they are certainly built vs Hadoop 2) On Wed, Jul 16, 2014 at 10:32 AM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi, I'm running a Java program using Spark Streaming 1.0.0 on Cloudera 4.4.0 quickstart virtual machine, with hadoop-client 2.0.0-mr1-cdh4.4.0, which is the one corresponding to my Hadoop distribution, and that works with other mapreduce programs, and with the maven property hadoop.version2.0.0-mr1-cdh4.4.0/hadoop.version configured according to http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html. When I set jssc.checkpoint(hdfs://localhost:8020/user/cloudera/bicing/streaming_checkpoints); I get a Server IPC version 7 cannot communicate with client version 4 running the program in local mode using local[4] as master. I have seen this problem before in other forums like http://qnalist.com/questions/4957822/hdfs-server-client-ipc-version-mismatch-while-trying-to-access-hdfs-files-using-spark-0-9-1 or http://comments.gmane.org/gmane.comp.lang.scala.spark.user/106 but the solution is basically setting the property I have already set. I have tried also with hadoop-version2.0.0-cdh4.4.0/hadoop-version and hadoop.major.version2.0/hadoop.major.version with no luck. Could someone help me with this? Thanks a lot in advance Greetings, Juan
Read all the columns from a file in spark sql
Hi, I am newbie to spark sql and i would like to know about how to read all the columns from a file in spark sql. I have referred the programming guide here: http://people.apache.org/~tdas/spark-1.0-docs/sql-programming-guide.html The example says: val people = sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)) But, instead of explicitly specifying p(0),p(1) I would like to read all the columns from a file. It would be difficult if my source dataset has more no of columns. Is there any shortcut for that? And instead of a single file, i would like to read multiple files which shares a similar structure from a directory. Could you please share your thoughts on this? It would be great , if you share any documentation which has details on these? Thanks
Re: Can Spark stack scale to petabyte scale without performance degradation?
Thanks Matei. On Tue, Jul 15, 2014 at 11:47 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Yup, as mentioned in the FAQ, we are aware of multiple deployments running jobs on over 1000 nodes. Some of our proof of concepts involved people running a 2000-node job on EC2. I wouldn't confuse buzz with FUD :). Matei On Jul 15, 2014, at 9:17 PM, Sonal Goyal sonalgoy...@gmail.com wrote: Hi Rohit, I think the 3rd question on the FAQ may help you. https://spark.apache.org/faq.html Some other links that talk about building bigger clusters and processing more data: http://spark-summit.org/wp-content/uploads/2014/07/Building-1000-node-Spark-Cluster-on-EMR.pdf http://apache-spark-user-list.1001560.n3.nabble.com/Largest-Spark-Cluster-td3782.html Best Regards, Sonal Nube Technologies http://www.nubetech.co/ http://in.linkedin.com/in/sonalgoyal On Wed, Jul 16, 2014 at 9:17 AM, Rohit Pujari rpuj...@hortonworks.com wrote: Hello Folks: There is lot of buzz in the hadoop community around Spark's inability to scale beyond the 1 TB datasets ( or 10-20 nodes). It is being regarded as great tech for cpu intensive workloads on smaller data( less that TB) but fails to scale and perform effectively on larger datasets. How true it is? Are there any customers in who are running petabyte scale workloads on spark in production? Are there any benchmarks performed by databricks or other companies to clear this perception? I'm a big fan of spark. Knowing spark is in its early stages, I'd like to better understand boundaries of the tech and recommend right solution for right problem. Thanks, Rohit Pujari Solutions Engineer, Hortonworks rpuj...@hortonworks.com 716-430-6899 CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. -- Rohit Pujari Solutions Engineer, Hortonworks rpuj...@hortonworks.com 716-430-6899 -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: count vs countByValue in for/yield
Hello all, Can anyone offer any insight on the below? Both are legal Spark but the first one works, the latter one does not. They both work on a local machine but in a standalone cluster the one with countByValue fails. Thanks! Ognen On 7/15/14, 2:23 PM, Ognen Duzlevski wrote: Hello, I am curious about something: val result = for { (dt,evrdd) - evrdds val ct = evrdd.count } yield (dt-ct) works. val result = for { (dt,evrdd) - evrdds val ct = evrdd.countByValue } yield (dt-ct) does not work. I get: 14/07/15 16:46:33 WARN TaskSetManager: Lost TID 0 (task 0.0:0) 14/07/15 16:46:33 WARN TaskSetManager: Loss was due to java.lang.NullPointerException java.lang.NullPointerException at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) What is the difference? Is it in the fact that countByValue passes back a Map and count passes back a Long? Thanks! Ognen
Problem running Spark shell (1.0.0) on EMR
Hi, I’m trying to run the Spark (1.0.0) shell on EMR and encountering a classpath issue. I suspect I’m missing something gloriously obviously, but so far it is eluding me. I launch the EMR Cluster (using the aws cli) with: aws emr create-cluster --name Test Cluster \ --ami-version 3.0.3 \ --no-auto-terminate \ --ec2-attributes KeyName=... \ --bootstrap-actions Path=s3://elasticmapreduce/samples/spark/1.0.0/install-spark-shark.rb \ --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m1.medium \ InstanceGroupType=CORE,InstanceCount=1,InstanceType=m1.medium --region eu-west-1 then, $ aws emr ssh --cluster-id ... --key-pair-file ... --region eu-west-1 On the master node, I then launch the shell with: [hadoop@ip-... spark]$ ./bin/spark-shell and try performing: scala val logs = sc.textFile(s3n://.../“) this produces: 14/07/16 12:40:35 WARN storage.BlockManager: Putting block broadcast_0 failed java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; Any help mighty welcome, ian
Simple record matching using Spark SQL
Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - *import org.apache.spark.sql.SQLContext;* *import org.apache.spark.rdd.RDD* *object SqlTest {* * case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String);* * sc.addJar(test1-0.1.jar);* * val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv);* * val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv);* * val sq = new SQLContext(sc);* * val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* * val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* * val file1_schema = sq.createSchemaRDD(file1_recs);* * val file2_schema = sq.createSchemaRDD(file2_recs);* * file1_schema.registerAsTable(file1_tab);* * file2_schema.registerAsTable(file2_tab);* * val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2);* * val count = matched.count();* * System.out.println(Found + matched.count() + matching records);* *}* When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it. But the same program is working well when executed from a spark shell. What is going wrong? What am I missing? ~Sarath
Re: Simple record matching using Spark SQL
Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - import org.apache.spark.sql.SQLContext; import org.apache.spark.rdd.RDD object SqlTest { case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String); sc.addJar(test1-0.1.jar); val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv); val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv); val sq = new SQLContext(sc); val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6))); val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6))); val file1_schema = sq.createSchemaRDD(file1_recs); val file2_schema = sq.createSchemaRDD(file2_recs); file1_schema.registerAsTable(file1_tab); file2_schema.registerAsTable(file2_tab); val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2); val count = matched.count(); System.out.println(Found + matched.count() + matching records); } When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it. But the same program is working well when executed from a spark shell. What is going wrong? What am I missing? ~Sarath
Re: Re: how to construct a ClassTag object as a method parameter in Java
Hi, I think same issue is happening with the constructor of the PartitionPruningRDD class. It hasn't been fixed in version 1.0.1 Should this be reported to JIRA? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-construct-a-ClassTag-object-as-a-method-parameter-in-Java-tp6768p9920.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Simple record matching using Spark SQL
Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything going wrong, all are info messages. What else do I need check? ~Sarath On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - *import org.apache.spark.sql.SQLContext;* *import org.apache.spark.rdd.RDD* *object SqlTest {* * case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String);* * sc.addJar(test1-0.1.jar);* * val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv);* * val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv);* * val sq = new SQLContext(sc);* * val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* * val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* * val file1_schema = sq.createSchemaRDD(file1_recs);* * val file2_schema = sq.createSchemaRDD(file2_recs);* * file1_schema.registerAsTable(file1_tab);* * file2_schema.registerAsTable(file2_tab);* * val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2);* * val count = matched.count();* * System.out.println(Found + matched.count() + matching records);* *}* When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it. But the same program is working well when executed from a spark shell. What is going wrong? What am I missing? ~Sarath
Re: Simple record matching using Spark SQL
When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything going wrong, all are info messages. What else do I need check? ~Sarath On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - *import org.apache.spark.sql.SQLContext;* *import org.apache.spark.rdd.RDD* *object SqlTest {* * case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String);* * sc.addJar(test1-0.1.jar);* * val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv);* * val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv);* * val sq = new SQLContext(sc);* * val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* * val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* * val file1_schema = sq.createSchemaRDD(file1_recs);* * val file2_schema = sq.createSchemaRDD(file2_recs);* * file1_schema.registerAsTable(file1_tab);* * file2_schema.registerAsTable(file2_tab);* * val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2);* * val count = matched.count();* * System.out.println(Found + matched.count() + matching records);* *}* When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it. But the same program is working well when executed from a spark shell. What is going wrong? What am I missing? ~Sarath
Re: Simple record matching using Spark SQL
Yes it is appearing on the Spark UI, and remains there with state as RUNNING till I press Ctrl+C in the terminal to kill the execution. Barring the statements to create the spark context, if I copy paste the lines of my code in spark shell, runs perfectly giving the desired output. ~Sarath On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta soumya.sima...@gmail.com wrote: When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything going wrong, all are info messages. What else do I need check? ~Sarath On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - *import org.apache.spark.sql.SQLContext;* *import org.apache.spark.rdd.RDD* *object SqlTest {* * case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String);* * sc.addJar(test1-0.1.jar);* * val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv);* * val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv);* * val sq = new SQLContext(sc);* * val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* * val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* * val file1_schema = sq.createSchemaRDD(file1_recs);* * val file2_schema = sq.createSchemaRDD(file2_recs);* * file1_schema.registerAsTable(file1_tab);* * file2_schema.registerAsTable(file2_tab);* * val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2);* * val count = matched.count();* * System.out.println(Found + matched.count() + matching records);* *}* When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it. But the same program is working well when executed from a spark shell. What is going wrong? What am I missing? ~Sarath
Re: Simple record matching using Spark SQL
Can you try submitting a very simple job to the cluster. On Jul 16, 2014, at 10:25 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes it is appearing on the Spark UI, and remains there with state as RUNNING till I press Ctrl+C in the terminal to kill the execution. Barring the statements to create the spark context, if I copy paste the lines of my code in spark shell, runs perfectly giving the desired output. ~Sarath On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta soumya.sima...@gmail.com wrote: When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything going wrong, all are info messages. What else do I need check? ~Sarath On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - import org.apache.spark.sql.SQLContext; import org.apache.spark.rdd.RDD object SqlTest { case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String); sc.addJar(test1-0.1.jar); val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv); val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv); val sq = new SQLContext(sc); val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6))); val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6))); val file1_schema = sq.createSchemaRDD(file1_recs); val file2_schema = sq.createSchemaRDD(file2_recs); file1_schema.registerAsTable(file1_tab); file2_schema.registerAsTable(file2_tab); val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2); val count = matched.count(); System.out.println(Found + matched.count() + matching records); } When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it. But the same program is working well when executed from a spark shell. What is going wrong? What am I missing? ~Sarath
Re: Read all the columns from a file in spark sql
I think what you might be looking for is the ability to programmatically specify the schema, which is coming in 1.1. Here's the JIRA: SPARK-2179 https://issues.apache.org/jira/browse/SPARK-2179 On Wed, Jul 16, 2014 at 8:24 AM, pandees waran pande...@gmail.com wrote: Hi, I am newbie to spark sql and i would like to know about how to read all the columns from a file in spark sql. I have referred the programming guide here: http://people.apache.org/~tdas/spark-1.0-docs/sql-programming-guide.html The example says: val people = sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)) But, instead of explicitly specifying p(0),p(1) I would like to read all the columns from a file. It would be difficult if my source dataset has more no of columns. Is there any shortcut for that? And instead of a single file, i would like to read multiple files which shares a similar structure from a directory. Could you please share your thoughts on this? It would be great , if you share any documentation which has details on these? Thanks
Re: Ambiguous references to id : what does it mean ?
Yes, but if both tagCollection and selectedVideos have a column named id then Spark SQL does not know which one you are referring to in the where clause. Here's an example with aliases: val x = testData2.as('x) val y = testData2.as('y) val join = x.join(y, Inner, Some(x.a.attr === y.a.attr)) On Wed, Jul 16, 2014 at 2:47 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: My query is just a simple query that use the spark sql dsl : tagCollection.join(selectedVideos).where('videoId === 'id) On Tue, Jul 15, 2014 at 6:03 PM, Yin Huai huaiyin@gmail.com wrote: Hi Jao, Seems the SQL analyzer cannot resolve the references in the Join condition. What is your query? Did you use the Hive Parser (your query was submitted through hql(...)) or the basic SQL Parser (your query was submitted through sql(...)). Thanks, Yin On Tue, Jul 15, 2014 at 8:52 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, When running a join operation with Spark SQL I got the following error : Exception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Ambiguous references to id: (id#303,List()),(id#0,List()), tree: Filter ('videoId = 'id) Join Inner, None ParquetRelation data/tags.parquet Filter (name#1 = P1/cam1) ParquetRelation data/videos.parquet What does it mean ? Cheers, jao
Re: Simple record matching using Spark SQL
Yes Soumya, I did it. First I tried with the example available in the documentation (example using people table and finding teenagers). After successfully running it, I moved on to this one which is starting point to a bigger requirement for which I'm evaluating Spark SQL. On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Can you try submitting a very simple job to the cluster. On Jul 16, 2014, at 10:25 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes it is appearing on the Spark UI, and remains there with state as RUNNING till I press Ctrl+C in the terminal to kill the execution. Barring the statements to create the spark context, if I copy paste the lines of my code in spark shell, runs perfectly giving the desired output. ~Sarath On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta soumya.sima...@gmail.com wrote: When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything going wrong, all are info messages. What else do I need check? ~Sarath On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - *import org.apache.spark.sql.SQLContext;* *import org.apache.spark.rdd.RDD* *object SqlTest {* * case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String);* * sc.addJar(test1-0.1.jar);* * val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv);* * val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv);* * val sq = new SQLContext(sc);* * val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* * val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* * val file1_schema = sq.createSchemaRDD(file1_recs);* * val file2_schema = sq.createSchemaRDD(file2_recs);* * file1_schema.registerAsTable(file1_tab);* * file2_schema.registerAsTable(file2_tab);* * val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2);* * val count = matched.count();* * System.out.println(Found + matched.count() + matching records);* *}* When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it. But the same program is working well when executed from a spark shell. What is going wrong? What am I missing? ~Sarath
Re: Simple record matching using Spark SQL
What if you just run something like: *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()* On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes Soumya, I did it. First I tried with the example available in the documentation (example using people table and finding teenagers). After successfully running it, I moved on to this one which is starting point to a bigger requirement for which I'm evaluating Spark SQL. On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Can you try submitting a very simple job to the cluster. On Jul 16, 2014, at 10:25 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes it is appearing on the Spark UI, and remains there with state as RUNNING till I press Ctrl+C in the terminal to kill the execution. Barring the statements to create the spark context, if I copy paste the lines of my code in spark shell, runs perfectly giving the desired output. ~Sarath On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta soumya.sima...@gmail.com wrote: When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything going wrong, all are info messages. What else do I need check? ~Sarath On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - *import org.apache.spark.sql.SQLContext;* *import org.apache.spark.rdd.RDD* *object SqlTest {* * case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String);* * sc.addJar(test1-0.1.jar);* * val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv);* * val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv);* * val sq = new SQLContext(sc);* * val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* * val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* * val file1_schema = sq.createSchemaRDD(file1_recs);* * val file2_schema = sq.createSchemaRDD(file2_recs);* * file1_schema.registerAsTable(file1_tab);* * file2_schema.registerAsTable(file2_tab);* * val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2);* * val count = matched.count();* * System.out.println(Found + matched.count() + matching records);* *}* When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it. But the same program is working well when executed from a spark shell. What is going wrong? What am I missing? ~Sarath
Re: Simple record matching using Spark SQL
Hi Michael, Tried it. It's correctly printing the line counts of both the files. Here's what I tried - *Code:* *package test* *object Test4 {* * case class Test(fld1: String, * * fld2: String, * * fld3: String, * * fld4: String, * * fld5: String, * * fld6: Double, * * fld7: String);* * def main(args: Array[String]) {* *val conf = new SparkConf()* *.setMaster(args(0))* * .setAppName(SQLTest)* * .setSparkHome(args(1))* * .set(spark.executor.memory, 2g);* *val sc = new SparkContext(conf);* *sc.addJar(test1-0.1.jar);* *val file1 = sc.textFile(args(2));* *println(file1.count());* *val file2 = sc.textFile(args(3));* *println(file2.count());* *//val sq = new SQLContext(sc);* *//import sq._* *//val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* *//val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* *//val file1_schema = sq.createSchemaRDD(file1_recs);* *//val file2_schema = sq.createSchemaRDD(file2_recs);* *//file1_schema.registerAsTable(file1_tab);* *//file2_schema.registerAsTable(file2_tab);* *//val matched = sq.sql(select * from file1_tab l join file2_tab s on + * *// l.fld7=s.fld7 where l.fld2=s.fld2 and + * *// l.fld3=s.fld3 and l.fld4=s.fld4 and + * *// l.fld6=s.fld6);* *//matched.collect().foreach(println);* * }* *}* *Execution:* *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar* *export CONFIG_OPTS=-Dspark.jars=test1-0.1.jar* *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077 /usr/local/spark-1.0.1-bin-hadoop1 hdfs://master:54310/user/hduser/file1.csv hdfs://master:54310/user/hduser/file2.csv* ~Sarath On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust mich...@databricks.com wrote: What if you just run something like: *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()* On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes Soumya, I did it. First I tried with the example available in the documentation (example using people table and finding teenagers). After successfully running it, I moved on to this one which is starting point to a bigger requirement for which I'm evaluating Spark SQL. On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Can you try submitting a very simple job to the cluster. On Jul 16, 2014, at 10:25 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes it is appearing on the Spark UI, and remains there with state as RUNNING till I press Ctrl+C in the terminal to kill the execution. Barring the statements to create the spark context, if I copy paste the lines of my code in spark shell, runs perfectly giving the desired output. ~Sarath On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta soumya.sima...@gmail.com wrote: When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything going wrong, all are info messages. What else do I need check? ~Sarath On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Check your executor logs for the output or if your data is not big collect it in the driver and print it. On Jul 16, 2014, at 9:21 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi All, I'm trying to do a simple record matching between 2 files and wrote following code - *import org.apache.spark.sql.SQLContext;* *import org.apache.spark.rdd.RDD* *object SqlTest {* * case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String);* * sc.addJar(test1-0.1.jar);* * val file1 = sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv);* * val file2 = sc.textFile(hdfs://localhost:54310/user/hduser/file2.csv);* * val sq = new SQLContext(sc);* * val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* * val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* * val file1_schema = sq.createSchemaRDD(file1_recs);* * val file2_schema = sq.createSchemaRDD(file2_recs);* * file1_schema.registerAsTable(file1_tab);* * file2_schema.registerAsTable(file2_tab);* * val matched = sq.sql(select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and
Re: Error: No space left on device
Hi Xiangrui, I accidentally did not send df -i for the master node. Here it is at the moment of failure: FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 280938 243350 54% / tmpfs3845409 1 38454081% /dev/shm /dev/xvdb100024321027 100014051% /mnt /dev/xvdf10002432 16 100024161% /mnt2 /dev/xvdv524288000 13 5242879871% /vol I am using default settings now, but is there a way to make sure that the proper directories are being used? How many blocks/partitions do you recommend? Chris On Wed, Jul 16, 2014 at 1:09 AM, Chris DuBois chris.dub...@gmail.com wrote: Hi Xiangrui, Here is the result on the master node: $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 273997 250291 53% / tmpfs1917974 1 19179731% /dev/shm /dev/xvdv524288000 30 5242879701% /vol I have reproduced the error while using the MovieLens 10M data set on a newly created cluster. Thanks for the help. Chris On Wed, Jul 16, 2014 at 12:22 AM, Xiangrui Meng men...@gmail.com wrote: Hi Chris, Could you also try `df -i` on the master node? How many blocks/partitions did you set? In the current implementation, ALS doesn't clean the shuffle data because the operations are chained together. But it shouldn't run out of disk space on the MovieLens dataset, which is small. spark-ec2 script sets /mnt/spark and /mnt/spark2 as the local.dir by default, I would recommend leaving this setting as the default value. Best, Xiangrui On Wed, Jul 16, 2014 at 12:02 AM, Chris DuBois chris.dub...@gmail.com wrote: Thanks for the quick responses! I used your final -Dspark.local.dir suggestion, but I see this during the initialization of the application: 14/07/16 06:56:08 INFO storage.DiskBlockManager: Created local directory at /vol/spark-local-20140716065608-7b2a I would have expected something in /mnt/spark/. Thanks, Chris On Tue, Jul 15, 2014 at 11:44 PM, Chris Gore cdg...@cdgore.com wrote: Hi Chris, I've encountered this error when running Spark’s ALS methods too. In my case, it was because I set spark.local.dir improperly, and every time there was a shuffle, it would spill many GB of data onto the local drive. What fixed it was setting it to use the /mnt directory, where a network drive is mounted. For example, setting an environmental variable: export SPACE=$(mount | grep mnt | awk '{print $3/spark/}' | xargs | sed 's/ /,/g’) Then adding -Dspark.local.dir=$SPACE or simply -Dspark.local.dir=/mnt/spark/,/mnt2/spark/ when you run your driver application Chris On Jul 15, 2014, at 11:39 PM, Xiangrui Meng men...@gmail.com wrote: Check the number of inodes (df -i). The assembly build may create many small files. -Xiangrui On Tue, Jul 15, 2014 at 11:35 PM, Chris DuBois chris.dub...@gmail.com wrote: Hi all, I am encountering the following error: INFO scheduler.TaskSetManager: Loss was due to java.io.IOException: No space left on device [duplicate 4] For each slave, df -h looks roughtly like this, which makes the above error surprising. FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 4.4G 3.5G 57% / tmpfs 7.4G 4.0K 7.4G 1% /dev/shm /dev/xvdb 37G 3.3G 32G 10% /mnt /dev/xvdf 37G 2.0G 34G 6% /mnt2 /dev/xvdv 500G 33M 500G 1% /vol I'm on an EC2 cluster (c3.xlarge + 5 x m3) that I launched using the spark-ec2 scripts and a clone of spark from today. The job I am running closely resembles the collaborative filtering example. This issue happens with the 1M version as well as the 10 million rating version of the MovieLens dataset. I have seen previous questions, but they haven't helped yet. For example, I tried setting the Spark tmp directory to the EBS volume at /vol/, both by editing the spark conf file (and copy-dir'ing it to the slaves) as well as through the SparkConf. Yet I still get the above error. Here is my current Spark config below. Note that I'm launching via ~/spark/bin/spark-submit. conf = SparkConf() conf.setAppName(RecommendALS).set(spark.local.dir, /vol/).set(spark.executor.memory, 7g).set(spark.akka.frameSize, 100).setExecutorEnv(SPARK_JAVA_OPTS, -Dspark.akka.frameSize=100) sc = SparkContext(conf=conf) Thanks for any advice, Chris
Re: Spark Streaming Json file groupby function
Hi TD, I Defines the Case Class outside the main method and was able to compile the code successfully. But getting a run time error when trying to process some json file from kafka. here is the code i an to compile import java.util.Properties import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkConf import scala.util.parsing.json.JSON import org.apache.spark.sql.SQLContext import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ case class Record(ID:String,name:String,score:String,school:String) object KafkaWordCount { def main(args: Array[String]) { if (args.length 4) { System.err.println(Usage: KafkaWordCount zkQuorum group topics numThreads) System.exit(1) } //StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(10)) val sql = new SparkContext(sparkConf) val sqlContext = new SQLContext(sql) val timer = Time(1) // ssc.checkpoint(checkpoint) import sqlContext._ val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]]) val fields = jsonf.map(data=Record(data(ID).toString,data(name).toString,data(score).toString,data(school).toString)) fields.print() val results = fields.foreachRDD((recrdd,tt) = { recrdd.registerAsTable(table1) val results =sqlContext.sql(select type from table1) println(results) results.foreach(println) results.map(t = Type: +t(0)).collect().foreach(println) }) //results.print() ssc.start() ssc.awaitTermination() } } and here is the error i am getting when trying to process some data == Query Plan == Project ['type] ExistingRdd [ID#60,name#61,score#62,school#63], MapPartitionsRDD[111] at mapPartitions at basicOperators.scala:174) 14/07/16 14:34:10 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 14/07/16 14:34:10 INFO TaskSetManager: Starting task 1.0:0 as TID 1 on executor localhost: localhost (PROCESS_LOCAL) 14/07/16 14:34:10 INFO TaskSetManager: Serialized task 1.0:0 as 2710 bytes in 0 ms 14/07/16 14:34:10 INFO Executor: Running task ID 1 14/07/16 14:34:10 ERROR Executor: Exception in task ID 1 java.lang.Exception: Could not compute split, block input-0-1405521243800 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/07/16 14:34:10 WARN TaskSetManager: Lost TID 1 (task 1.0:0) 14/07/16 14:34:10 WARN TaskSetManager: Loss was due to java.lang.Exception java.lang.Exception: Could not compute split, block input-0-1405521243800 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at
Re: Trouble with spark-ec2 script: --ebs-vol-size
Should I take it from the lack of replies that the --ebs-vol-size feature doesn't work? -Ben -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Trouble-with-spark-ec2-script-ebs-vol-size-tp9619p9934.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Trouble with spark-ec2 script: --ebs-vol-size
please add From: Ben Horner [via Apache Spark User List] ml-node+s1001560n9934...@n3.nabble.commailto:ml-node+s1001560n9934...@n3.nabble.com Date: Wednesday, July 16, 2014 at 8:47 AM To: Ben Horner ben.hor...@atigeo.commailto:ben.hor...@atigeo.com Subject: Re: Trouble with spark-ec2 script: --ebs-vol-size Should I take it from the lack of replies that the --ebs-vol-size feature doesn't work? -Ben If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Trouble-with-spark-ec2-script-ebs-vol-size-tp9619p9934.html To unsubscribe from Trouble with spark-ec2 script: --ebs-vol-size, click herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=9619code=YmVuLmhvcm5lckBhdGlnZW8uY29tfDk2MTl8MTk2OTU5NjEyOQ==. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Trouble-with-spark-ec2-script-ebs-vol-size-tp9619p9935.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Gradient Boosting Decision Trees
Hi there, I am looking for a GBM MLlib implementation. Does anyone know if there is a plan to roll it out soon? Thanks! Pedro
Re: Retrieve dataset of Big Data Benchmark
Hi Burak, Thank you for your pointer, it is really helping out. I do have some consecutive questions though. After looking at the Big Data Benchmark page https://amplab.cs.berkeley.edu/benchmark/ (Section Run this benchmark yourself), I was expecting the following combination of files: Sets: Uservisits, Rankings, Crawl Size: tiny, 1node, 5node Both in text and Sequence file. When looking at http://s3.amazonaws.com/big-data-benchmark/, I only see sequence-snappy/5nodes/_distcp_logs_44js2v part 0 to 103 sequence-snappy/5nodes/_distcp_logs_nclxhd part 0 to 102 sequence-snappy/5nodes/_distcp_logs_vnuhym part 0 to 24 sequence-snappy/5nodes/crawl part 0 to 743 As Crawl is the name of a set I am looking for, I started to download it. Since it was the end of the day and I was going to download it overnight, I just wrote a for loop from 0 to 999 with wget, expecting it to download until 7-something and 404 errors for the others. When I looked at it this morning, I noticed that it all completed downloading. The total Crawl set for 5 nodes should be ~30Gb, I am currently at part 1020 with a total set of 40G. This leads to my (sub)questions: Does anybody know what exactly is still hosted: - Are the tiny and 1node sets still available? - Are the Uservisits and Rankings still available? - Why is the crawl set bigger than expected, and how big is it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-dataset-of-Big-Data-Benchmark-tp9821p9938.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Trouble with spark-ec2 script: --ebs-vol-size
Hi Ben, It worked for me, but only when using the default region. Using --region=us-west-2 resulted in errors about security groups. Chris On Wed, Jul 16, 2014 at 8:53 AM, Ben Horner ben.hor...@atigeo.com wrote: please add From: Ben Horner [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=9935i=0 Date: Wednesday, July 16, 2014 at 8:47 AM To: Ben Horner [hidden email] http://user/SendEmail.jtp?type=nodenode=9935i=1 Subject: Re: Trouble with spark-ec2 script: --ebs-vol-size Should I take it from the lack of replies that the --ebs-vol-size feature doesn't work? -Ben -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Trouble-with-spark-ec2-script-ebs-vol-size-tp9619p9934.html To unsubscribe from Trouble with spark-ec2 script: --ebs-vol-size, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Trouble with spark-ec2 script: --ebs-vol-size http://apache-spark-user-list.1001560.n3.nabble.com/Trouble-with-spark-ec2-script-ebs-vol-size-tp9619p9935.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Terminal freeze during SVM
so I need to reconfigure my sparkcontext this way: val conf = new SparkConf() .setMaster(local) .setAppName(CountingSheep) .set(spark.executor.memory, 1g) .set(spark.akka.frameSize,20) val sc = new SparkContext(conf) And start a new cluster with the setup scripts from Spark 1.0.1. Is this the right approach? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Terminal-freeze-during-SVM-Broken-pipe-tp9022p9941.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Gradient Boosting Decision Trees
Hi Pedro, Yes, although they will probably not be included in the next release (since the code freeze is ~2 weeks away), GBM (and other ensembles of decision trees) are currently under active development. We're hoping they'll make it into the subsequent release. -Ameet On Wed, Jul 16, 2014 at 9:08 AM, Pedro Silva jpedrosi...@gmail.com wrote: Hi there, I am looking for a GBM MLlib implementation. Does anyone know if there is a plan to roll it out soon? Thanks! Pedro
running Spark App on Yarn produces: Exception in thread main java.lang.NoSuchFieldException: DEFAULT_YARN_APPLICATION_CLASSPATH
Hello community, tried to run storm app on yarn, using cloudera hadoop and spark distro (from http://archive.cloudera.com/cdh5/cdh/5) hadoop version: hadoop-2.3.0-cdh5.0.3.tar.gz spark version: spark-0.9.0-cdh5.0.3.tar.gz DEFAULT_YARN_APPLICATION_CLASSPATH is part of hadoop-api-yarn jar ... thanks for any replies! [amilkowski@localhost spark-streaming]$ ./test-yarn.sh 14/07/16 12:47:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/16 12:47:17 INFO client.RMProxy: Connecting to ResourceManager at / 0.0.0.0:8032 14/07/16 12:47:17 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 1 14/07/16 12:47:17 INFO yarn.Client: Queue info ... queueName: root.default, queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/07/16 12:47:17 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 8192 14/07/16 12:47:17 INFO yarn.Client: Preparing Local resources 14/07/16 12:47:18 INFO yarn.Client: Uploading file:/opt/local/cloudera/spark/cdh5/spark-0.9.0-cdh5.0.3/examples/target/scala-2.10/spark-examples-assembly-0.9.0-cdh5.0.3.jar to hdfs://localhost:8020/user/amilkowski/.sparkStaging/application_1405528355264_0004/spark-examples-assembly-0.9.0-cdh5.0.3.jar 14/07/16 12:47:19 INFO yarn.Client: Uploading file:/opt/local/cloudera/spark/cdh5/spark-0.9.0-cdh5.0.3/assembly/target/scala-2.10/spark-assembly-0.9.0-cdh5.0.3-hadoop2.3.0-cdh5.0.3.jar to hdfs://localhost:8020/user/amilkowski/.sparkStaging/application_1405528355264_0004/spark-assembly-0.9.0-cdh5.0.3-hadoop2.3.0-cdh5.0.3.jar 14/07/16 12:47:19 INFO yarn.Client: Setting up the launch environment Exception in thread main java.lang.NoSuchFieldException: DEFAULT_YARN_APPLICATION_CLASSPATH at java.lang.Class.getField(Class.java:1579) at org.apache.spark.deploy.yarn.ClientBase$.getDefaultYarnApplicationClasspath(ClientBase.scala:403) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$5.apply(ClientBase.scala:386) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$5.apply(ClientBase.scala:386) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.deploy.yarn.ClientBase$.populateHadoopClasspath(ClientBase.scala:385) at org.apache.spark.deploy.yarn.ClientBase$.populateClasspath(ClientBase.scala:444) at org.apache.spark.deploy.yarn.ClientBase$class.setupLaunchEnv(ClientBase.scala:274) at org.apache.spark.deploy.yarn.Client.setupLaunchEnv(Client.scala:41) at org.apache.spark.deploy.yarn.Client.runApp(Client.scala:77) at org.apache.spark.deploy.yarn.Client.run(Client.scala:98) at org.apache.spark.deploy.yarn.Client$.main(Client.scala:183) at org.apache.spark.deploy.yarn.Client.main(Client.scala) [amilkowski@localhost spark-streaming]$
Re: Spark Streaming Json file groupby function
Hi Srinivas, Seems the query you used is val results =sqlContext.sql(select type from table1). However, table1 does not have a field called type. The schema of table1 is defined as the class definition of your case class Record (i.e. ID, name, score, and school are fields of your table1). Can you change your query and see if your program works? Thanks, Yin On Wed, Jul 16, 2014 at 8:25 AM, srinivas kusamsrini...@gmail.com wrote: Hi TD, I Defines the Case Class outside the main method and was able to compile the code successfully. But getting a run time error when trying to process some json file from kafka. here is the code i an to compile import java.util.Properties import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkConf import scala.util.parsing.json.JSON import org.apache.spark.sql.SQLContext import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ case class Record(ID:String,name:String,score:String,school:String) object KafkaWordCount { def main(args: Array[String]) { if (args.length 4) { System.err.println(Usage: KafkaWordCount zkQuorum group topics numThreads) System.exit(1) } //StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(10)) val sql = new SparkContext(sparkConf) val sqlContext = new SQLContext(sql) val timer = Time(1) // ssc.checkpoint(checkpoint) import sqlContext._ val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]]) val fields = jsonf.map(data=Record(data(ID).toString,data(name).toString,data(score).toString,data(school).toString)) fields.print() val results = fields.foreachRDD((recrdd,tt) = { recrdd.registerAsTable(table1) val results =sqlContext.sql(select type from table1) println(results) results.foreach(println) results.map(t = Type: +t(0)).collect().foreach(println) }) //results.print() ssc.start() ssc.awaitTermination() } } and here is the error i am getting when trying to process some data == Query Plan == Project ['type] ExistingRdd [ID#60,name#61,score#62,school#63], MapPartitionsRDD[111] at mapPartitions at basicOperators.scala:174) 14/07/16 14:34:10 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 14/07/16 14:34:10 INFO TaskSetManager: Starting task 1.0:0 as TID 1 on executor localhost: localhost (PROCESS_LOCAL) 14/07/16 14:34:10 INFO TaskSetManager: Serialized task 1.0:0 as 2710 bytes in 0 ms 14/07/16 14:34:10 INFO Executor: Running task ID 1 14/07/16 14:34:10 ERROR Executor: Exception in task ID 1 java.lang.Exception: Could not compute split, block input-0-1405521243800 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at
Re: Gradient Boosting Decision Trees
Hi Ameet, that's great news! Thanks, Pedro On Wed, Jul 16, 2014 at 9:33 AM, Ameet Talwalkar atalwal...@gmail.com wrote: Hi Pedro, Yes, although they will probably not be included in the next release (since the code freeze is ~2 weeks away), GBM (and other ensembles of decision trees) are currently under active development. We're hoping they'll make it into the subsequent release. -Ameet On Wed, Jul 16, 2014 at 9:08 AM, Pedro Silva jpedrosi...@gmail.com wrote: Hi there, I am looking for a GBM MLlib implementation. Does anyone know if there is a plan to roll it out soon? Thanks! Pedro
Re: running Spark App on Yarn produces: Exception in thread main java.lang.NoSuchFieldException: DEFAULT_YARN_APPLICATION_CLASSPATH
Somewhere in here, you are not actually running vs Hadoop 2 binaries. Your cluster is certainly Hadoop 2, but your client is not using the Hadoop libs you think it is (or your compiled binary is linking against Hadoop 1, which is the default for Spark -- did you change it?) On Wed, Jul 16, 2014 at 5:45 PM, Andrew Milkowski amgm2...@gmail.com wrote: Hello community, tried to run storm app on yarn, using cloudera hadoop and spark distro (from http://archive.cloudera.com/cdh5/cdh/5) hadoop version: hadoop-2.3.0-cdh5.0.3.tar.gz spark version: spark-0.9.0-cdh5.0.3.tar.gz DEFAULT_YARN_APPLICATION_CLASSPATH is part of hadoop-api-yarn jar ... thanks for any replies! [amilkowski@localhost spark-streaming]$ ./test-yarn.sh 14/07/16 12:47:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/16 12:47:17 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 14/07/16 12:47:17 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 1 14/07/16 12:47:17 INFO yarn.Client: Queue info ... queueName: root.default, queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/07/16 12:47:17 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 8192 14/07/16 12:47:17 INFO yarn.Client: Preparing Local resources 14/07/16 12:47:18 INFO yarn.Client: Uploading file:/opt/local/cloudera/spark/cdh5/spark-0.9.0-cdh5.0.3/examples/target/scala-2.10/spark-examples-assembly-0.9.0-cdh5.0.3.jar to hdfs://localhost:8020/user/amilkowski/.sparkStaging/application_1405528355264_0004/spark-examples-assembly-0.9.0-cdh5.0.3.jar 14/07/16 12:47:19 INFO yarn.Client: Uploading file:/opt/local/cloudera/spark/cdh5/spark-0.9.0-cdh5.0.3/assembly/target/scala-2.10/spark-assembly-0.9.0-cdh5.0.3-hadoop2.3.0-cdh5.0.3.jar to hdfs://localhost:8020/user/amilkowski/.sparkStaging/application_1405528355264_0004/spark-assembly-0.9.0-cdh5.0.3-hadoop2.3.0-cdh5.0.3.jar 14/07/16 12:47:19 INFO yarn.Client: Setting up the launch environment Exception in thread main java.lang.NoSuchFieldException: DEFAULT_YARN_APPLICATION_CLASSPATH at java.lang.Class.getField(Class.java:1579) at org.apache.spark.deploy.yarn.ClientBase$.getDefaultYarnApplicationClasspath(ClientBase.scala:403) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$5.apply(ClientBase.scala:386) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$5.apply(ClientBase.scala:386) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.deploy.yarn.ClientBase$.populateHadoopClasspath(ClientBase.scala:385) at org.apache.spark.deploy.yarn.ClientBase$.populateClasspath(ClientBase.scala:444) at org.apache.spark.deploy.yarn.ClientBase$class.setupLaunchEnv(ClientBase.scala:274) at org.apache.spark.deploy.yarn.Client.setupLaunchEnv(Client.scala:41) at org.apache.spark.deploy.yarn.Client.runApp(Client.scala:77) at org.apache.spark.deploy.yarn.Client.run(Client.scala:98) at org.apache.spark.deploy.yarn.Client$.main(Client.scala:183) at org.apache.spark.deploy.yarn.Client.main(Client.scala) [amilkowski@localhost spark-streaming]$
using multiple dstreams together (spark streaming)
Hi, My application has multiple dstreams on the same inputstream: dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window I want to write logic that deals with all three windows (e.g. when the 1 second window differs from the 2 second window by some delta ...) I've found some examples online (there's not much out there!), and I can only see people transforming a single dstream. In conventional spark, we'd do this sort of thing with a cartesian on RDDs. How can I deal with multiple Dstreams at once? Thanks
Re: running Spark App on Yarn produces: Exception in thread main java.lang.NoSuchFieldException: DEFAULT_YARN_APPLICATION_CLASSPATH
Andrew, Are you running on a CM-managed cluster? I just checked, and there is a bug here (fixed in 1.0), but it's avoided by having yarn.application.classpath defined in your yarn-site.xml. -Sandy On Wed, Jul 16, 2014 at 10:02 AM, Sean Owen so...@cloudera.com wrote: Somewhere in here, you are not actually running vs Hadoop 2 binaries. Your cluster is certainly Hadoop 2, but your client is not using the Hadoop libs you think it is (or your compiled binary is linking against Hadoop 1, which is the default for Spark -- did you change it?) On Wed, Jul 16, 2014 at 5:45 PM, Andrew Milkowski amgm2...@gmail.com wrote: Hello community, tried to run storm app on yarn, using cloudera hadoop and spark distro (from http://archive.cloudera.com/cdh5/cdh/5) hadoop version: hadoop-2.3.0-cdh5.0.3.tar.gz spark version: spark-0.9.0-cdh5.0.3.tar.gz DEFAULT_YARN_APPLICATION_CLASSPATH is part of hadoop-api-yarn jar ... thanks for any replies! [amilkowski@localhost spark-streaming]$ ./test-yarn.sh 14/07/16 12:47:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/16 12:47:17 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 14/07/16 12:47:17 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 1 14/07/16 12:47:17 INFO yarn.Client: Queue info ... queueName: root.default, queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/07/16 12:47:17 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 8192 14/07/16 12:47:17 INFO yarn.Client: Preparing Local resources 14/07/16 12:47:18 INFO yarn.Client: Uploading file:/opt/local/cloudera/spark/cdh5/spark-0.9.0-cdh5.0.3/examples/target/scala-2.10/spark-examples-assembly-0.9.0-cdh5.0.3.jar to hdfs://localhost:8020/user/amilkowski/.sparkStaging/application_1405528355264_0004/spark-examples-assembly-0.9.0-cdh5.0.3.jar 14/07/16 12:47:19 INFO yarn.Client: Uploading file:/opt/local/cloudera/spark/cdh5/spark-0.9.0-cdh5.0.3/assembly/target/scala-2.10/spark-assembly-0.9.0-cdh5.0.3-hadoop2.3.0-cdh5.0.3.jar to hdfs://localhost:8020/user/amilkowski/.sparkStaging/application_1405528355264_0004/spark-assembly-0.9.0-cdh5.0.3-hadoop2.3.0-cdh5.0.3.jar 14/07/16 12:47:19 INFO yarn.Client: Setting up the launch environment Exception in thread main java.lang.NoSuchFieldException: DEFAULT_YARN_APPLICATION_CLASSPATH at java.lang.Class.getField(Class.java:1579) at org.apache.spark.deploy.yarn.ClientBase$.getDefaultYarnApplicationClasspath(ClientBase.scala:403) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$5.apply(ClientBase.scala:386) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$5.apply(ClientBase.scala:386) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.deploy.yarn.ClientBase$.populateHadoopClasspath(ClientBase.scala:385) at org.apache.spark.deploy.yarn.ClientBase$.populateClasspath(ClientBase.scala:444) at org.apache.spark.deploy.yarn.ClientBase$class.setupLaunchEnv(ClientBase.scala:274) at org.apache.spark.deploy.yarn.Client.setupLaunchEnv(Client.scala:41) at org.apache.spark.deploy.yarn.Client.runApp(Client.scala:77) at org.apache.spark.deploy.yarn.Client.run(Client.scala:98) at org.apache.spark.deploy.yarn.Client$.main(Client.scala:183) at org.apache.spark.deploy.yarn.Client.main(Client.scala) [amilkowski@localhost spark-streaming]$
Re: using multiple dstreams together (spark streaming)
I'm joining several kafka dstreams using the join operation but you have the limitation that the duration of the batch has to be same,i.e. 1 second window for all dstreams... so it would not work for you. 2014-07-16 18:08 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Hi, My application has multiple dstreams on the same inputstream: dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window I want to write logic that deals with all three windows (e.g. when the 1 second window differs from the 2 second window by some delta ...) I've found some examples online (there's not much out there!), and I can only see people transforming a single dstream. In conventional spark, we'd do this sort of thing with a cartesian on RDDs. How can I deal with multiple Dstreams at once? Thanks
Re: running Spark App on Yarn produces: Exception in thread main java.lang.NoSuchFieldException: DEFAULT_YARN_APPLICATION_CLASSPATH
OK, if you're sure your binary has Hadoop 2 and/or your classpath has Hadoop 2, that's not it. I'd look at Sandy's suggestion then. On Wed, Jul 16, 2014 at 6:11 PM, Andrew Milkowski amgm2...@gmail.com wrote: thanks Sean! so what I did is in project/SparkBuild.scala I made it compile with 2.3.0-cdh5.0.3 (and I even did sbt clean before sbt/sbt assembly, this should have build example client with 2.3.0 object SparkBuild extends Build { // Hadoop version to build against. For example, 1.0.4 for Apache releases, or // 2.0.0-mr1-cdh4.2.0 for Cloudera Hadoop. Note that these variables can be set // through the environment variables SPARK_HADOOP_VERSION and SPARK_YARN. //val DEFAULT_HADOOP_VERSION = 1.0.4 val DEFAULT_HADOOP_VERSION = 2.3.0-cdh5.0.3 // Whether the Hadoop version to build against is 2.2.x, or a variant of it. This can be set // through the SPARK_IS_NEW_HADOOP environment variable. //val DEFAULT_IS_NEW_HADOOP = false val DEFAULT_IS_NEW_HADOOP = true //val DEFAULT_YARN = false val DEFAULT_YARN = true On Wed, Jul 16, 2014 at 1:02 PM, Sean Owen so...@cloudera.com wrote: Somewhere in here, you are not actually running vs Hadoop 2 binaries. Your cluster is certainly Hadoop 2, but your client is not using the Hadoop libs you think it is (or your compiled binary is linking against Hadoop 1, which is the default for Spark -- did you change it?) On Wed, Jul 16, 2014 at 5:45 PM, Andrew Milkowski amgm2...@gmail.com wrote: Hello community, tried to run storm app on yarn, using cloudera hadoop and spark distro (from http://archive.cloudera.com/cdh5/cdh/5) hadoop version: hadoop-2.3.0-cdh5.0.3.tar.gz spark version: spark-0.9.0-cdh5.0.3.tar.gz DEFAULT_YARN_APPLICATION_CLASSPATH is part of hadoop-api-yarn jar ... thanks for any replies! [amilkowski@localhost spark-streaming]$ ./test-yarn.sh 14/07/16 12:47:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/16 12:47:17 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 14/07/16 12:47:17 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 1 14/07/16 12:47:17 INFO yarn.Client: Queue info ... queueName: root.default, queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/07/16 12:47:17 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 8192 14/07/16 12:47:17 INFO yarn.Client: Preparing Local resources 14/07/16 12:47:18 INFO yarn.Client: Uploading file:/opt/local/cloudera/spark/cdh5/spark-0.9.0-cdh5.0.3/examples/target/scala-2.10/spark-examples-assembly-0.9.0-cdh5.0.3.jar to hdfs://localhost:8020/user/amilkowski/.sparkStaging/application_1405528355264_0004/spark-examples-assembly-0.9.0-cdh5.0.3.jar 14/07/16 12:47:19 INFO yarn.Client: Uploading file:/opt/local/cloudera/spark/cdh5/spark-0.9.0-cdh5.0.3/assembly/target/scala-2.10/spark-assembly-0.9.0-cdh5.0.3-hadoop2.3.0-cdh5.0.3.jar to hdfs://localhost:8020/user/amilkowski/.sparkStaging/application_1405528355264_0004/spark-assembly-0.9.0-cdh5.0.3-hadoop2.3.0-cdh5.0.3.jar 14/07/16 12:47:19 INFO yarn.Client: Setting up the launch environment Exception in thread main java.lang.NoSuchFieldException: DEFAULT_YARN_APPLICATION_CLASSPATH at java.lang.Class.getField(Class.java:1579) at org.apache.spark.deploy.yarn.ClientBase$.getDefaultYarnApplicationClasspath(ClientBase.scala:403) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$5.apply(ClientBase.scala:386) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$5.apply(ClientBase.scala:386) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.deploy.yarn.ClientBase$.populateHadoopClasspath(ClientBase.scala:385) at org.apache.spark.deploy.yarn.ClientBase$.populateClasspath(ClientBase.scala:444) at org.apache.spark.deploy.yarn.ClientBase$class.setupLaunchEnv(ClientBase.scala:274) at org.apache.spark.deploy.yarn.Client.setupLaunchEnv(Client.scala:41) at org.apache.spark.deploy.yarn.Client.runApp(Client.scala:77) at org.apache.spark.deploy.yarn.Client.run(Client.scala:98) at org.apache.spark.deploy.yarn.Client$.main(Client.scala:183) at org.apache.spark.deploy.yarn.Client.main(Client.scala) [amilkowski@localhost spark-streaming]$
Re: running Spark App on Yarn produces: Exception in thread main java.lang.NoSuchFieldException: DEFAULT_YARN_APPLICATION_CLASSPATH
thanks Sandzy, no CM-managed cluster, straight from cloudera tar ( http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.3.0-cdh5.0.3.tar.gz) trying your suggestion immediate! thanks so much for taking time.. On Wed, Jul 16, 2014 at 1:10 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Andrew, Are you running on a CM-managed cluster? I just checked, and there is a bug here (fixed in 1.0), but it's avoided by having yarn.application.classpath defined in your yarn-site.xml. -Sandy On Wed, Jul 16, 2014 at 10:02 AM, Sean Owen so...@cloudera.com wrote: Somewhere in here, you are not actually running vs Hadoop 2 binaries. Your cluster is certainly Hadoop 2, but your client is not using the Hadoop libs you think it is (or your compiled binary is linking against Hadoop 1, which is the default for Spark -- did you change it?) On Wed, Jul 16, 2014 at 5:45 PM, Andrew Milkowski amgm2...@gmail.com wrote: Hello community, tried to run storm app on yarn, using cloudera hadoop and spark distro (from http://archive.cloudera.com/cdh5/cdh/5) hadoop version: hadoop-2.3.0-cdh5.0.3.tar.gz spark version: spark-0.9.0-cdh5.0.3.tar.gz DEFAULT_YARN_APPLICATION_CLASSPATH is part of hadoop-api-yarn jar ... thanks for any replies! [amilkowski@localhost spark-streaming]$ ./test-yarn.sh 14/07/16 12:47:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/16 12:47:17 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 14/07/16 12:47:17 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 1 14/07/16 12:47:17 INFO yarn.Client: Queue info ... queueName: root.default, queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/07/16 12:47:17 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 8192 14/07/16 12:47:17 INFO yarn.Client: Preparing Local resources 14/07/16 12:47:18 INFO yarn.Client: Uploading file:/opt/local/cloudera/spark/cdh5/spark-0.9.0-cdh5.0.3/examples/target/scala-2.10/spark-examples-assembly-0.9.0-cdh5.0.3.jar to hdfs://localhost:8020/user/amilkowski/.sparkStaging/application_1405528355264_0004/spark-examples-assembly-0.9.0-cdh5.0.3.jar 14/07/16 12:47:19 INFO yarn.Client: Uploading file:/opt/local/cloudera/spark/cdh5/spark-0.9.0-cdh5.0.3/assembly/target/scala-2.10/spark-assembly-0.9.0-cdh5.0.3-hadoop2.3.0-cdh5.0.3.jar to hdfs://localhost:8020/user/amilkowski/.sparkStaging/application_1405528355264_0004/spark-assembly-0.9.0-cdh5.0.3-hadoop2.3.0-cdh5.0.3.jar 14/07/16 12:47:19 INFO yarn.Client: Setting up the launch environment Exception in thread main java.lang.NoSuchFieldException: DEFAULT_YARN_APPLICATION_CLASSPATH at java.lang.Class.getField(Class.java:1579) at org.apache.spark.deploy.yarn.ClientBase$.getDefaultYarnApplicationClasspath(ClientBase.scala:403) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$5.apply(ClientBase.scala:386) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$5.apply(ClientBase.scala:386) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.deploy.yarn.ClientBase$.populateHadoopClasspath(ClientBase.scala:385) at org.apache.spark.deploy.yarn.ClientBase$.populateClasspath(ClientBase.scala:444) at org.apache.spark.deploy.yarn.ClientBase$class.setupLaunchEnv(ClientBase.scala:274) at org.apache.spark.deploy.yarn.Client.setupLaunchEnv(Client.scala:41) at org.apache.spark.deploy.yarn.Client.runApp(Client.scala:77) at org.apache.spark.deploy.yarn.Client.run(Client.scala:98) at org.apache.spark.deploy.yarn.Client$.main(Client.scala:183) at org.apache.spark.deploy.yarn.Client.main(Client.scala) [amilkowski@localhost spark-streaming]$
Re: Number of executors change during job running
Hi Tathagata, I have tried the repartition method. The reduce stage first had 2 executors and then it had around 85 executors. I specified repartition(300) and each of the executors were specified 2 cores when I submitted the job. This shows repartition works to increase more executors. However, the running time was still around 50 seconds although I only did a simple groupby operation. I think repartition may consume part of the running time. Considering the input source of Kafka, is there a way to make the program even faster? Thanks! On Mon, Jul 14, 2014 at 3:22 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you give me a screen shot of the stages page in the web ui, the spark logs, and the code that is causing this behavior. This seems quite weird to me. TD On Mon, Jul 14, 2014 at 2:11 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, It seems repartition does not necessarily force Spark to distribute the data into different executors. I have launched a new job which uses repartition right after I received data from Kafka. For the first two batches, the reduce stage used more than 80 executors. Starting from the third batch, there were always only 2 executors in the reduce task (combineByKey). Even with the first batch which used more than 80 executors, it took 2.4 mins to finish the reduce stage for a very small amount of data. Bill On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das tathagata.das1...@gmail.com wrote: After using repartition(300), how many executors did it run on? By the way, repartitions(300) means it will divide the shuffled data into 300 partitions. Since there are many cores on each of the 300 machines/executors, these partitions (each requiring a core) may not be spread all 300 executors. Hence, if you really want spread it all 300 executors, you may have to bump up the partitions even more. However, increasing the partitions to too high may not be beneficial, and you will have play around with the number to figure out sweet spot that reduces the time to process the stage / time to process the whole batch. TD On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Do you mean that the data is not shuffled until the reduce stage? That means groupBy still only uses 2 machines? I think I used repartition(300) after I read the data from Kafka into DStream. It seems that it did not guarantee that the map or reduce stages will be run on 300 machines. I am currently trying to initiate 100 DStream from KafkaUtils.createDStream and union them. Now the reduce stages had around 80 machines for all the batches. However, this method will introduce many dstreams. It will be good if we can control the number of executors in the groupBy operation because the calculation needs to be finished within 1 minute for different size of input data based on our production need. Thanks! Bill On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Aah, I get it now. That is because the input data streams is replicated on two machines, so by locality the data is processed on those two machines. So the map stage on the data uses 2 executors, but the reduce stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the default parallelism takes into affect only when the data is explicitly shuffled around. You can fix this by explicitly repartitioning the data. inputDStream.repartition(partitions) This is covered in the streaming tuning guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving . TD On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi folks, I just ran another job that only received data from Kafka, did some filtering, and then save as text files in HDFS. There was no reducing work involved. Surprisingly, the number of executors for the saveAsTextFiles stage was also 2 although I specified 300 executors in the job submission. As a result, the simple save file action took more than 2 minutes. Do you have any idea how Spark determined the number of executors for different stages? Thanks! Bill On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Below is my main function. I omit some filtering and data conversion functions. These functions are just a one-to-one mapping, which may not possible increase running time. The only reduce function I have here is groupByKey. There are 4 topics in my Kafka brokers and two of the topics have 240k lines each minute. And the other two topics have less than 30k lines per minute. The batch size is one minute and I specified 300 executors in my spark-submit script. The default parallelism is 300. val parition = 300 val zkQuorum = zk1,zk2,zk3 val group = my-group- + currentTime.toString val topics =
ClassNotFoundException: $line11.$read$ when loading an HDFS text file with SparkQL in spark-shell
Hi all, I just installed a mesos 0.19 cluster. I am failing to execute basic SparkQL operations on text files with Spark 1.0.1 with the spark-shell. I have one Mesos master without zookeeper and 4 mesos slaves. All nodes are running JDK 1.7.51 and Scala 2.10.4. The spark package is uploaded to hdfs and the user running the mesos slave has permission to access to it. I am runnning HDFS from the latest CDH5. I tried both with the pre-built CDH5 spark package available from http://spark.apache.org/downloads.html and by packaging spark with sbt 0.13.2, JDK 1.7.51 and scala 2.10.4 as explained here http://mesosphere.io/learn/run-spark-on-mesos/ No matter what I try, when I execute the following code on the spark-shell : The job fails with the following error reported by the mesos slave nodes: Note that runnning a simple map+reduce job on the same hdfs files with the same installation works fine: The hdfs files contain just plain csv files: spark-env.sh look like this: Any help, comment or pointer would be greatly appreciated! Thanks in advance Svend -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFoundException-line11-read-when-loading-an-HDFS-text-file-with-SparkQL-in-spark-shell-tp9954.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: using multiple dstreams together (spark streaming)
Yeah -- I tried the .union operation and it didn't work for that reason. Surely there has to be a way to do this, as I imagine this is a commonly desired goal in streaming applications? On Wed, Jul 16, 2014 at 10:10 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I'm joining several kafka dstreams using the join operation but you have the limitation that the duration of the batch has to be same,i.e. 1 second window for all dstreams... so it would not work for you. 2014-07-16 18:08 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Hi, My application has multiple dstreams on the same inputstream: dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window I want to write logic that deals with all three windows (e.g. when the 1 second window differs from the 2 second window by some delta ...) I've found some examples online (there's not much out there!), and I can only see people transforming a single dstream. In conventional spark, we'd do this sort of thing with a cartesian on RDDs. How can I deal with multiple Dstreams at once? Thanks
Re: using multiple dstreams together (spark streaming)
hum... maybe consuming all streams at the same time with an actor that would act as a new DStream source... but this is just a random idea... I don't really know if that would be a good idea or even possible. 2014-07-16 18:30 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Yeah -- I tried the .union operation and it didn't work for that reason. Surely there has to be a way to do this, as I imagine this is a commonly desired goal in streaming applications? On Wed, Jul 16, 2014 at 10:10 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I'm joining several kafka dstreams using the join operation but you have the limitation that the duration of the batch has to be same,i.e. 1 second window for all dstreams... so it would not work for you. 2014-07-16 18:08 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Hi, My application has multiple dstreams on the same inputstream: dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window I want to write logic that deals with all three windows (e.g. when the 1 second window differs from the 2 second window by some delta ...) I've found some examples online (there's not much out there!), and I can only see people transforming a single dstream. In conventional spark, we'd do this sort of thing with a cartesian on RDDs. How can I deal with multiple Dstreams at once? Thanks
Re: using multiple dstreams together (spark streaming)
Or, if not, is there a way to do this in terms of a single dstream? Keep in mind that dstream1, dstream2, and dstream3 have already had transformations applied. I tried creating the dstreams by calling .window on the first one, but that ends up with me having ... 3 dstreams... which is the same problem. On Wed, Jul 16, 2014 at 10:30 AM, Walrus theCat walrusthe...@gmail.com wrote: Yeah -- I tried the .union operation and it didn't work for that reason. Surely there has to be a way to do this, as I imagine this is a commonly desired goal in streaming applications? On Wed, Jul 16, 2014 at 10:10 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I'm joining several kafka dstreams using the join operation but you have the limitation that the duration of the batch has to be same,i.e. 1 second window for all dstreams... so it would not work for you. 2014-07-16 18:08 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Hi, My application has multiple dstreams on the same inputstream: dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window I want to write logic that deals with all three windows (e.g. when the 1 second window differs from the 2 second window by some delta ...) I've found some examples online (there's not much out there!), and I can only see people transforming a single dstream. In conventional spark, we'd do this sort of thing with a cartesian on RDDs. How can I deal with multiple Dstreams at once? Thanks
Re: using multiple dstreams together (spark streaming)
hey at least it's something (thanks!) ... not sure what i'm going to do if i can't find a solution (other than not use spark) as i really need these capabilities. anyone got anything else? On Wed, Jul 16, 2014 at 10:34 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: hum... maybe consuming all streams at the same time with an actor that would act as a new DStream source... but this is just a random idea... I don't really know if that would be a good idea or even possible. 2014-07-16 18:30 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Yeah -- I tried the .union operation and it didn't work for that reason. Surely there has to be a way to do this, as I imagine this is a commonly desired goal in streaming applications? On Wed, Jul 16, 2014 at 10:10 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I'm joining several kafka dstreams using the join operation but you have the limitation that the duration of the batch has to be same,i.e. 1 second window for all dstreams... so it would not work for you. 2014-07-16 18:08 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Hi, My application has multiple dstreams on the same inputstream: dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window I want to write logic that deals with all three windows (e.g. when the 1 second window differs from the 2 second window by some delta ...) I've found some examples online (there's not much out there!), and I can only see people transforming a single dstream. In conventional spark, we'd do this sort of thing with a cartesian on RDDs. How can I deal with multiple Dstreams at once? Thanks
Re: running Spark App on Yarn produces: Exception in thread main java.lang.NoSuchFieldException: DEFAULT_YARN_APPLICATION_CLASSPATH
Sandy, perfect! you saved me tons of time! added this in yarn-site.xml job ran to completion Can you do me (us) a favor and push newest and patched spark/hadoop to cdh5 (tar's) if possible and thanks again for this (huge time saver) On Wed, Jul 16, 2014 at 1:10 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Andrew, Are you running on a CM-managed cluster? I just checked, and there is a bug here (fixed in 1.0), but it's avoided by having yarn.application.classpath defined in your yarn-site.xml. -Sandy On Wed, Jul 16, 2014 at 10:02 AM, Sean Owen so...@cloudera.com wrote: Somewhere in here, you are not actually running vs Hadoop 2 binaries. Your cluster is certainly Hadoop 2, but your client is not using the Hadoop libs you think it is (or your compiled binary is linking against Hadoop 1, which is the default for Spark -- did you change it?) On Wed, Jul 16, 2014 at 5:45 PM, Andrew Milkowski amgm2...@gmail.com wrote: Hello community, tried to run storm app on yarn, using cloudera hadoop and spark distro (from http://archive.cloudera.com/cdh5/cdh/5) hadoop version: hadoop-2.3.0-cdh5.0.3.tar.gz spark version: spark-0.9.0-cdh5.0.3.tar.gz DEFAULT_YARN_APPLICATION_CLASSPATH is part of hadoop-api-yarn jar ... thanks for any replies! [amilkowski@localhost spark-streaming]$ ./test-yarn.sh 14/07/16 12:47:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/16 12:47:17 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 14/07/16 12:47:17 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 1 14/07/16 12:47:17 INFO yarn.Client: Queue info ... queueName: root.default, queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/07/16 12:47:17 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 8192 14/07/16 12:47:17 INFO yarn.Client: Preparing Local resources 14/07/16 12:47:18 INFO yarn.Client: Uploading file:/opt/local/cloudera/spark/cdh5/spark-0.9.0-cdh5.0.3/examples/target/scala-2.10/spark-examples-assembly-0.9.0-cdh5.0.3.jar to hdfs://localhost:8020/user/amilkowski/.sparkStaging/application_1405528355264_0004/spark-examples-assembly-0.9.0-cdh5.0.3.jar 14/07/16 12:47:19 INFO yarn.Client: Uploading file:/opt/local/cloudera/spark/cdh5/spark-0.9.0-cdh5.0.3/assembly/target/scala-2.10/spark-assembly-0.9.0-cdh5.0.3-hadoop2.3.0-cdh5.0.3.jar to hdfs://localhost:8020/user/amilkowski/.sparkStaging/application_1405528355264_0004/spark-assembly-0.9.0-cdh5.0.3-hadoop2.3.0-cdh5.0.3.jar 14/07/16 12:47:19 INFO yarn.Client: Setting up the launch environment Exception in thread main java.lang.NoSuchFieldException: DEFAULT_YARN_APPLICATION_CLASSPATH at java.lang.Class.getField(Class.java:1579) at org.apache.spark.deploy.yarn.ClientBase$.getDefaultYarnApplicationClasspath(ClientBase.scala:403) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$5.apply(ClientBase.scala:386) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$5.apply(ClientBase.scala:386) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.deploy.yarn.ClientBase$.populateHadoopClasspath(ClientBase.scala:385) at org.apache.spark.deploy.yarn.ClientBase$.populateClasspath(ClientBase.scala:444) at org.apache.spark.deploy.yarn.ClientBase$class.setupLaunchEnv(ClientBase.scala:274) at org.apache.spark.deploy.yarn.Client.setupLaunchEnv(Client.scala:41) at org.apache.spark.deploy.yarn.Client.runApp(Client.scala:77) at org.apache.spark.deploy.yarn.Client.run(Client.scala:98) at org.apache.spark.deploy.yarn.Client$.main(Client.scala:183) at org.apache.spark.deploy.yarn.Client.main(Client.scala) [amilkowski@localhost spark-streaming]$
Re: running Spark App on Yarn produces: Exception in thread main java.lang.NoSuchFieldException: DEFAULT_YARN_APPLICATION_CLASSPATH
For others, to solve topic problem: in yarn-site.xml add: property nameyarn.application.classpath/name value$HADOOP_CONF_DIR, $HADOOP_COMMON_HOME/share/hadoop/common/*, $HADOOP_COMMON_HOME/share/hadoop/common/lib/*, $HADOOP_HDFS_HOME/share/hadoop/hdfs/*, $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*, $HADOOP_YARN_HOME/share/hadoop/yarn/*, $HADOOP_YARN_HOME/share/hadoop/yarn/lib/*/value /property On Wed, Jul 16, 2014 at 1:47 PM, Andrew Milkowski amgm2...@gmail.com wrote: Sandy, perfect! you saved me tons of time! added this in yarn-site.xml job ran to completion Can you do me (us) a favor and push newest and patched spark/hadoop to cdh5 (tar's) if possible and thanks again for this (huge time saver) On Wed, Jul 16, 2014 at 1:10 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Andrew, Are you running on a CM-managed cluster? I just checked, and there is a bug here (fixed in 1.0), but it's avoided by having yarn.application.classpath defined in your yarn-site.xml. -Sandy On Wed, Jul 16, 2014 at 10:02 AM, Sean Owen so...@cloudera.com wrote: Somewhere in here, you are not actually running vs Hadoop 2 binaries. Your cluster is certainly Hadoop 2, but your client is not using the Hadoop libs you think it is (or your compiled binary is linking against Hadoop 1, which is the default for Spark -- did you change it?) On Wed, Jul 16, 2014 at 5:45 PM, Andrew Milkowski amgm2...@gmail.com wrote: Hello community, tried to run storm app on yarn, using cloudera hadoop and spark distro (from http://archive.cloudera.com/cdh5/cdh/5) hadoop version: hadoop-2.3.0-cdh5.0.3.tar.gz spark version: spark-0.9.0-cdh5.0.3.tar.gz DEFAULT_YARN_APPLICATION_CLASSPATH is part of hadoop-api-yarn jar ... thanks for any replies! [amilkowski@localhost spark-streaming]$ ./test-yarn.sh 14/07/16 12:47:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/16 12:47:17 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 14/07/16 12:47:17 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 1 14/07/16 12:47:17 INFO yarn.Client: Queue info ... queueName: root.default, queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/07/16 12:47:17 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 8192 14/07/16 12:47:17 INFO yarn.Client: Preparing Local resources 14/07/16 12:47:18 INFO yarn.Client: Uploading file:/opt/local/cloudera/spark/cdh5/spark-0.9.0-cdh5.0.3/examples/target/scala-2.10/spark-examples-assembly-0.9.0-cdh5.0.3.jar to hdfs://localhost:8020/user/amilkowski/.sparkStaging/application_1405528355264_0004/spark-examples-assembly-0.9.0-cdh5.0.3.jar 14/07/16 12:47:19 INFO yarn.Client: Uploading file:/opt/local/cloudera/spark/cdh5/spark-0.9.0-cdh5.0.3/assembly/target/scala-2.10/spark-assembly-0.9.0-cdh5.0.3-hadoop2.3.0-cdh5.0.3.jar to hdfs://localhost:8020/user/amilkowski/.sparkStaging/application_1405528355264_0004/spark-assembly-0.9.0-cdh5.0.3-hadoop2.3.0-cdh5.0.3.jar 14/07/16 12:47:19 INFO yarn.Client: Setting up the launch environment Exception in thread main java.lang.NoSuchFieldException: DEFAULT_YARN_APPLICATION_CLASSPATH at java.lang.Class.getField(Class.java:1579) at org.apache.spark.deploy.yarn.ClientBase$.getDefaultYarnApplicationClasspath(ClientBase.scala:403) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$5.apply(ClientBase.scala:386) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$5.apply(ClientBase.scala:386) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.deploy.yarn.ClientBase$.populateHadoopClasspath(ClientBase.scala:385) at org.apache.spark.deploy.yarn.ClientBase$.populateClasspath(ClientBase.scala:444) at org.apache.spark.deploy.yarn.ClientBase$class.setupLaunchEnv(ClientBase.scala:274) at org.apache.spark.deploy.yarn.Client.setupLaunchEnv(Client.scala:41) at org.apache.spark.deploy.yarn.Client.runApp(Client.scala:77) at org.apache.spark.deploy.yarn.Client.run(Client.scala:98) at org.apache.spark.deploy.yarn.Client$.main(Client.scala:183) at org.apache.spark.deploy.yarn.Client.main(Client.scala) [amilkowski@localhost spark-streaming]$
Re: Need help on spark Hbase
Hi Rajesh, I saw : Warning: Local jar /home/rajesh/hbase-0.96.1.1-hadoop2/lib/hbase -client-0.96.1.1-hadoop2.jar, does not exist, skipping. in your log. I believe this jar contains the HBaseConfiguration. I'm not sure what went wrong in your case but can you try without spaces in --jars i.e. --jars A.jar,B.jar,C.jar not --jars A.jar, B.jar, C.jar I'm just guessing because when I used --jars I never have spaces in it. HTH, Jerry On Wed, Jul 16, 2014 at 5:30 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Team, Now i've changed my code and reading configuration from hbase-site.xml file(this file is in classpath). When i run this program using : mvn exec:java -Dexec.mainClass=com.cisco.ana.accessavailability.AccessAvailability. It is working fine. But when i run this program from spark-submit i'm getting below exception Please find below exception : spark-submit command not able to found the HbaseConfiguration. How to resolve this issue? rajesh@rajesh-VirtualBox:~/Downloads/spark-1.0.0$ ./bin/spark-submit --master local --class com.cisco.ana.accessavailability.AccessAvailability --jars /home/rajesh/Downloads/MISC/ANA_Access/target/ANA_Access-0.0.1-SNAPSHOT.jar, /home/rajesh/hbase-0.96.1.1-hadoop2/lib/hbase-client-0.96.1.1-hadoop2.jar, /home/rajesh/hbase-0.96.1.1-hadoop2/lib/hbase-common-0.96.1.1-hadoop2.jar, /home/rajesh/hbase-0.96.1.1-hadoop2/lib/hbase-hadoop2-compat-0.96.1.1-hadoop2.jar, /home/rajesh/hbase-0.96.1.1-hadoop2/lib/hbase-it-0.96.1.1-hadoop2.jar, /home/rajesh/hbase-0.96.1.1-hadoop2/lib/hbase-protocol-0.96.1.1-hadoop2.jar, /home/rajesh/hbase-0.96.1.1-hadoop2/lib/hbase-server-0.96.1.1-hadoop2.jar, /home/rajesh/hbase-0.96.1.1-hadoop2/lib/htrace-core-2.01.jar, /home/rajesh/Downloads/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop2.2.0.jar Warning: Local jar /home/rajesh/hbase-0.96.1.1-hadoop2/lib/hbase-client-0.96.1.1-hadoop2.jar, does not exist, skipping. Before *Exception in thread main java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration* at com.cisco.ana.accessavailability.AccessAvailability.main(AccessAvailability.java:80) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.HBaseConfiguration at java.net.URLClassLoader$1.run(URLClassLoader.java:217) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:323) at java.lang.ClassLoader.loadClass(ClassLoader.java:268) ... 8 more Please find below code : public class AccessAvailability { public static void main(String[] args) throws Exception { System.out.println( Before); Configuration configuration = HBaseConfiguration.create(); System.out.println( After); SparkConf s = new SparkConf().setMaster(local); JavaStreamingContext ssc = new JavaStreamingContext(master,AccessAvailability, new Duration(4), sparkHome, ); JavaDStreamString lines_2 = ssc.textFileStream(hdfsfolderpath); } } Regards, Rajesh On Wed, Jul 16, 2014 at 5:39 AM, Krishna Sankar ksanka...@gmail.com wrote: Good catch. I thought the largest port number is 65535. Cheers k/ On Tue, Jul 15, 2014 at 4:33 PM, Spark DevUser spark.devu...@gmail.com wrote: Are you able to launch *hbase shell* and run some commands (list, describe, scan, etc)? Seems *configuration.set(hbase.**master, localhost:60)* is wrong. On Tue, Jul 15, 2014 at 3:00 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Also, it helps if you post us logs, stacktraces, exceptions, etc. TD On Tue, Jul 15, 2014 at 10:07 AM, Jerry Lam chiling...@gmail.com wrote: Hi Rajesh, I have a feeling that this is not directly related to spark but I might be wrong. The reason why is that when you do: Configuration configuration = HBaseConfiguration.create(); by default, it reads the configuration files hbase-site.xml in your classpath and ... (I don't remember all the configuration files hbase has). I noticed that you overwrote some configuration settings in the code but I'm not if you have other configurations that might have conflicted with those. Could you try the following, remove anything that is spark specific leaving only hbase related codes. uber jar it and run it just like any other simple java program. If you still
Difference among batchDuration, windowDuration, slideDuration
When I'm reading the API of spark streaming, I'm confused by the 3 different durations StreamingContext(conf: SparkConf http://spark.apache.org/docs/latest/api/scala/org/apache/spark/SparkConf.html , batchDuration: Duration http://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/Duration.html ) DStream window(windowDuration: Duration http://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/Duration.html , slideDuration: Duration http://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/Duration.html ): DStream http://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/dstream/DStream.html [T] Can anyone please explain these 3 different durations Best, Siyuan
RE: executor-cores vs. num-executors
Thanks for sharing your experience. I got the same experience -- multiple moderate JVMs beat a single huge JVM. Besides the minor JVM starting overhead, is it always better to have multiple JVMs rather than a single one? Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From: innowireless TaeYun Kim taeyun@innowireless.co.kr To: user@spark.apache.org, Date: 07/16/2014 05:04 AM Subject:RE: executor-cores vs. num-executors Thanks. Really, now I compare a stage data of the two jobs, ‘core7-exec3’ spends about 12.5 minutes more than ‘core2-exec12’ on GC. From: Nishkam Ravi [mailto:nr...@cloudera.com] Sent: Wednesday, July 16, 2014 5:28 PM To: user@spark.apache.org Subject: Re: executor-cores vs. num-executors I think two small JVMs would often beat a large one due to lower GC overhead.
Re: Kyro deserialisation error
Is the class that is not found in the wikipediapagerank jar? TD On Wed, Jul 16, 2014 at 12:32 AM, Hao Wang wh.s...@gmail.com wrote: Thanks for your reply. The SparkContext is configured as below: sparkConf.setAppName(WikipediaPageRank) sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) sparkConf.set(spark.kryo.registrator, classOf[PRKryoRegistrator].getName) val inputFile = args(0) val threshold = args(1).toDouble val numPartitions = args(2).toInt val usePartitioner = args(3).toBoolean sparkConf.setAppName(WikipediaPageRank) sparkConf.set(spark.executor.memory, 60g) sparkConf.set(spark.cores.max, 48) sparkConf.set(spark.kryoserializer.buffer.mb, 24) val sc = new SparkContext(sparkConf) sc.addJar(~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar) And I use spark-submit to run the application: ./bin/spark-submit --master spark://sing12:7077 --total-executor-cores 40 --executor-memory 40g --class org.apache.spark.examples.bagel.WikipediaPageRank ~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar hdfs://192.168.1.12:9000/freebase-26G 1 200 True Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Wed, Jul 16, 2014 at 1:41 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using classes from external libraries that have not been added to the sparkContext, using sparkcontext.addJar()? TD On Tue, Jul 15, 2014 at 8:36 PM, Hao Wang wh.s...@gmail.com wrote: I am running the WikipediaPageRank in Spark example and share the same problem with you: 4/07/16 11:31:06 DEBUG DAGScheduler: submitStage(Stage 6) 14/07/16 11:31:06 ERROR TaskSetManager: Task 6.0:450 failed 4 times; aborting job 14/07/16 11:31:06 INFO DAGScheduler: Failed to run foreach at Bagel.scala:251 Exception in thread main 14/07/16 11:31:06 INFO TaskSchedulerImpl: Cancelling stage 6 org.apache.spark.SparkException: Job aborted due to stage failure: Task 6.0:450 failed 4 times, most recent failure: Exception failure in TID 1330 on host sing11: com.esotericsoftware.kryo.KryoException: Unable to find class: arl Fridtjof Rode com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) com.twitter.chill.TraversableSerializer.read(Traversable.scala:44) com.twitter.chill.TraversableSerializer.read(Traversable.scala:21) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:115) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) Anyone cloud help? Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Tue, Jun 3, 2014 at 8:02 PM, Denes te...@outlook.com wrote: I tried to use Kryo as a serialiser isn spark streaming, did everything according to the guide posted on the spark website, i.e. added the following lines: conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.kryo.registrator, MyKryoRegistrator); I also added the necessary classes to the MyKryoRegistrator. However I get the following strange error, can someone help me out where to look for a solution? 14/06/03 09:00:49 ERROR scheduler.JobScheduler: Error running job streaming job 140177880 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Exception while deserializing and fetching task: com.esotericsoftware.kryo.KryoException: Unable to find class: J Serialization trace: id (org.apache.spark.storage.GetBlock) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at
Re: Retrieve dataset of Big Data Benchmark
Hi Tom, Actually I was mistaken, sorry about that. Indeed on the website, the keys for the datasets you mention are not showing up. However, they are still accessible through the spark-shell, which means that they are there. So in order to answer your questions: - Are the tiny and 1node sets still available? Yes, they are. - Are the Uservisits and Rankings still available? Yes, they are. - Why is the crawl set bigger than expected, and how big is it? It says on the website that it is ~30 GB per node. Since you're downloading the 5nodes version, the total size should be 150 GB. Coming to other ways on you can download them: I propose using the spark-shell would be easiest (At least for me it was :). Once you start the spark-shell, you can access the files as (example for the tiny crawl dataset, exchange with 1node, 5nodes uservisits, rankings as desired. Mind the lowercase): val dataset = sc.textFile(s3n://big-data-benchmark/pavlo/text/tiny/crawl) dataset.saveAsTextFile(your/local/relative/path/here) The file will be saved relative to where you run the spark-shell from. Hope this helps! Burak - Original Message - From: Tom thubregt...@gmail.com To: u...@spark.incubator.apache.org Sent: Wednesday, July 16, 2014 9:10:58 AM Subject: Re: Retrieve dataset of Big Data Benchmark Hi Burak, Thank you for your pointer, it is really helping out. I do have some consecutive questions though. After looking at the Big Data Benchmark page https://amplab.cs.berkeley.edu/benchmark/ (Section Run this benchmark yourself), I was expecting the following combination of files: Sets: Uservisits, Rankings, Crawl Size: tiny, 1node, 5node Both in text and Sequence file. When looking at http://s3.amazonaws.com/big-data-benchmark/, I only see sequence-snappy/5nodes/_distcp_logs_44js2v part 0 to 103 sequence-snappy/5nodes/_distcp_logs_nclxhd part 0 to 102 sequence-snappy/5nodes/_distcp_logs_vnuhym part 0 to 24 sequence-snappy/5nodes/crawl part 0 to 743 As Crawl is the name of a set I am looking for, I started to download it. Since it was the end of the day and I was going to download it overnight, I just wrote a for loop from 0 to 999 with wget, expecting it to download until 7-something and 404 errors for the others. When I looked at it this morning, I noticed that it all completed downloading. The total Crawl set for 5 nodes should be ~30Gb, I am currently at part 1020 with a total set of 40G. This leads to my (sub)questions: Does anybody know what exactly is still hosted: - Are the tiny and 1node sets still available? - Are the Uservisits and Rankings still available? - Why is the crawl set bigger than expected, and how big is it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-dataset-of-Big-Data-Benchmark-tp9821p9938.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Repeated data item search with Spark SQL(1.0.1)
Mostly true. The execution of two equivalent logical plans will be exactly the same, independent of the dialect. Resolution can be slightly different as SQLContext defaults to case sensitive and HiveContext defaults to case insensitive. One other very technical detail: The actual planning done by HiveContext and SQLContext are slightly different as SQLContext does not have strategies for reading data from HiveTables. All other operators should be the same though. This is not a difference though that has anything to do with the dialect. On Wed, Jul 16, 2014 at 2:13 PM, Jerry Lam chiling...@gmail.com wrote: Hi Michael, Thank you for the explanation. Can you validate the following statement is true/incomplete/false: hql uses Hive to parse and to construct the logical plan whereas sql is pure spark implementation of parsing and logical plan construction. Once spark obtains the logical plan, it is executed in spark regardless of dialect although the execution might be different for the same query. Best Regards, Jerry On Tue, Jul 15, 2014 at 6:22 PM, Michael Armbrust mich...@databricks.com wrote: hql and sql are just two different dialects for interacting with data. After parsing is complete and the logical plan is constructed, the execution is exactly the same. On Tue, Jul 15, 2014 at 2:50 PM, Jerry Lam chiling...@gmail.com wrote: Hi Michael, I don't understand the difference between hql (HiveContext) and sql (SQLContext). My previous understanding was that hql is hive specific. Unless the table is managed by Hive, we should use sql. For instance, RDD (hdfsRDD) created from files in HDFS and registered as a table should use sql. However, my current understanding after trying your suggestion above is that I can also query the hdfsRDD using hql via LocalHiveContext. I just tested it, the lateral view explode(schools) works with the hdfsRDD. It seems to me that the HiveContext and SQLContext is the same except that HiveContext needs a metastore and it has a more powerful SQL support borrowed from Hive. Can you shed some lights on this when you get a minute? Thanks, Jerry On Tue, Jul 15, 2014 at 4:32 PM, Michael Armbrust mich...@databricks.com wrote: No, that is why I included the link to SPARK-2096 https://issues.apache.org/jira/browse/SPARK-2096 as well. You'll need to use HiveQL at this time. Is it possible or planed to support the schools.time format to filter the record that there is an element inside array of schools satisfy time 2? It would be great to support something like this, but its going to take a while to hammer out the correct semantics as SQL does not in general have great support for nested structures. I think different people might interpret that query to mean there is SOME school.time 2 vs. ALL school.time 2, etc. You can get what you want now using a lateral view: hql(SELECT DISTINCT name FROM people LATERAL VIEW explode(schools) s as school WHERE school.time 2)
Re: SPARK_WORKER_PORT (standalone cluster)
Now I see the answer to this. Spark slaves are start on random ports, and tell the master where they are. then the master acknowledges them. (worker logs) Starting Spark worker :43282 (master logs) Registering worker on :43282 with 8 cores, 16.5 GB RAM Thus, the port is random because the slaves can be ephemeral. Since the master is fixed, though, a new slave can reconnect at any time. On Mon, Jul 14, 2014 at 10:01 PM, jay vyas jayunit100.apa...@gmail.com wrote: Hi spark ! What is the purpose of the randomly assigned SPARK_WORKER_PORT from the documentation it sais to join a cluster, but its not clear to me how a random port could be used to communicate with other members of a spark pool. This question might be grounded in my ignorance ... if so please just point me to the right documentation if im mising something obvious :) thanks ! -- jay vyas -- jay vyas
Re: Difference among batchDuration, windowDuration, slideDuration
The only other thing to keep in mind is that window duration and slide duration have to be multiples of batch duration, IDK if you made that fully clear -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Difference-among-batchDuration-windowDuration-slideDuration-tp9966p9973.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: can't get jobs to run on cluster (enough memory and cpus are available on worker)
Thanks Marcelo, I'm not seeing anything in the logs that clearly explains what's causing this to break. One interesting point that we just discovered is that if we run the driver and the slave (worker) on the same host it runs, but if we run the driver on a separate host it does not run. Anyways, this is all I see on the worker: 14/07/16 19:32:27 INFO Worker: Asked to launch executor app-20140716193227-/0 for Spark Pi 14/07/16 19:32:27 WARN CommandUtils: SPARK_JAVA_OPTS was set on the worker. It is deprecated in Spark 1.0. 14/07/16 19:32:27 WARN CommandUtils: Set SPARK_LOCAL_DIRS for node-specific storage locations. Spark assembly has been built with Hive, including Datanucleus jars on classpath 14/07/16 19:32:27 INFO ExecutorRunner: Launch command: /cask/jdk/bin/java -cp ::/cask/spark/conf:/cask/spark/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/cask/spark/lib/datanucleus-api-jdo-3.2.1.jar:/cask/spark/lib/datanucleus-rdbms-3.2.1.jar:/cask/spark/lib/datanucleus-core-3.2.2.jar -XX:MaxPermSize=128m -Dspark.akka.frameSize=100 -Dspark.akka.frameSize=100 -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@ip-10-202-11-191.ec2.internal:47740/user/CoarseGrainedScheduler 0 ip-10-202-8-45.ec2.internal 8 akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker app-20140716193227- And on the driver I see this: 14/07/16 19:32:26 INFO SparkContext: Added JAR file:/cask/spark/lib/spark-examples-1.0.0-hadoop2.2.0.jar at http://10.202.11.191:39642/jars/spark-examples-1.0.0-hadoop2.2.0.jar with timestamp 1405539146752 14/07/16 19:32:26 INFO AppClient$ClientActor: Connecting to master spark://ip-10-202-9-195.ec2.internal:7077... 14/07/16 19:32:26 INFO SparkContext: Starting job: reduce at SparkPi.scala:35 14/07/16 19:32:26 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:35) with 2 output partitions (allowLocal=false) 14/07/16 19:32:26 INFO DAGScheduler: Final stage: Stage 0(reduce at SparkPi.scala:35) 14/07/16 19:32:26 INFO DAGScheduler: Parents of final stage: List() 14/07/16 19:32:26 INFO DAGScheduler: Missing parents: List() 14/07/16 19:32:26 DEBUG DAGScheduler: submitStage(Stage 0) 14/07/16 19:32:26 DEBUG DAGScheduler: missing: List() 14/07/16 19:32:26 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at map at SparkPi.scala:31), which has no missing parents 14/07/16 19:32:26 DEBUG DAGScheduler: submitMissingTasks(Stage 0) 14/07/16 19:32:26 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[1] at map at SparkPi.scala:31) 14/07/16 19:32:26 DEBUG DAGScheduler: New pending tasks: Set(ResultTask(0, 0), ResultTask(0, 1)) 14/07/16 19:32:26 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 14/07/16 19:32:27 DEBUG TaskSetManager: Epoch for TaskSet 0.0: 0 14/07/16 19:32:27 DEBUG TaskSetManager: Valid locality levels for TaskSet 0.0: ANY 14/07/16 19:32:27 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 0 14/07/16 19:32:27 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140716193227- 14/07/16 19:32:27 INFO AppClient$ClientActor: Executor added: app-20140716193227-/0 on worker-20140716193059-ip-10-202-8-45.ec2.internal-7101 (ip-10-202-8-45.ec2.internal:7101) with 8 cores 14/07/16 19:32:27 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140716193227-/0 on hostPort ip-10-202-8-45.ec2.internal:7101 with 8 cores, 512.0 MB RAM 14/07/16 19:32:27 INFO AppClient$ClientActor: Executor updated: app-20140716193227-/0 is now RUNNING If I wait long enough and see several inital job has not accepted any resources messages on the driver, this shows up in the worker: 14/07/16 19:34:09 INFO Worker: Executor app-20140716193227-/0 finished with state FAILED message Command exited with code 1 exitStatus 1 14/07/16 19:34:09 INFO Worker: Asked to launch executor app-20140716193227-/1 for Spark Pi 14/07/16 19:34:09 WARN CommandUtils: SPARK_JAVA_OPTS was set on the worker. It is deprecated in Spark 1.0. 14/07/16 19:34:09 WARN CommandUtils: Set SPARK_LOCAL_DIRS for node-specific storage locations. 14/07/16 19:34:09 INFO LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.202.8.45%3A46568-2#593829151] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 14/07/16 19:34:09 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101] - [akka.tcp://sparkExecutor@ip-10-202-8-45.ec2.internal:46848]: Error [Association failed with [akka.tcp://sparkExecutor@ip-10-202-8-45.ec2.internal:46848]] [ akka.remote.EndpointAssociationException: Association failed with
Re: ClassNotFoundException: $line11.$read$ when loading an HDFS text file with SparkQL in spark-shell
Note that runnning a simple map+reduce job on the same hdfs files with the same installation works fine: Did you call collect() on the totalLength? Otherwise nothing has actually executed.
Re: ClassNotFoundException: $line11.$read$ when loading an HDFS text file with SparkQL in spark-shell
Oh, I'm sorry... reduce is also an operation On Wed, Jul 16, 2014 at 3:37 PM, Michael Armbrust mich...@databricks.com wrote: Note that runnning a simple map+reduce job on the same hdfs files with the same installation works fine: Did you call collect() on the totalLength? Otherwise nothing has actually executed.
SaveAsTextFile of RDD taking much time
Hi All,I am new to Spark. Written a program to read data from local big file, sort using Spark SQL and then filter based some validation rules. I have tested this program with 23860746 lines of file, and it took 39 secs (2 cores and Xmx as 6gb). But, when I want to serializing it to a local file, it is taking much time (I stopped the execution). For 100K lines of file, without saveAsTextFile, program took 8 secs, whereas writing to file taking 20 mins. BTW, I am using Kryo serialization and StorageLevel.MEMORY_ONLY_SER option for persisting into RAM. The program is almost as provided below:package main.scalaimport scala.reflect.runtime.universeimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.serializer.KryoRegistratorimport org.apache.spark.sql.SQLContextimport org.apache.spark.storage.StorageLevelimport com.esotericsoftware.kryo.Kryoobject SparkSqlApplication extends App { val txtFile = /home/admin/scala/bigfile.txt val outputDir = file:///home/admin/scala/spark-poc/sample_data1_spark-sql val conf = new SparkConf().setMaster(local) .setAppName(Spark App).setSparkHome($SPARK_HOME) .setJars(List(target/scala-2.10/spark-project_2.10-1.0.jar)) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, main.scala.ReconRegistrator) val sc = new SparkContext(conf) val sqlCtx = new SQLContext(sc) import sqlCtx.createSchemaRDD val patient = sc.textFile(txtFile) .persist(StorageLevel.MEMORY_ONLY_SER) .map(_.split(,)) .map(arr = Patient(arr(0).trim(), arr(1), arr(2))) .registerAsTable(patient) val sortedPat = sqlCtx.sql(select * from patient order by pcode) val validator = new GroovyIntegrator() val filteredInvalidPat = sortedPat.filter(patientRow = !validator.applyRules( + patientRow(0))) filteredInvalidPat.coalesce(1, false).saveAsTextFile(outputDir)} case class Patient(pcode: String, disease: String, dcategory: String)class ReconRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[Patient]) }}Can anyone help on this?Thanks,Sudip -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SaveAsTextFile-of-RDD-taking-much-time-tp9979.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming, external windowing?
One way to do that is currently possible is given here http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAMwrk0=b38dewysliwyc6hmze8tty8innbw6ixatnd1ue2-...@mail.gmail.com%3E On Wed, Jul 16, 2014 at 1:16 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi Sargun, There have been few discussions on the list recently about the topic. The short answer is that this is not supported at the moment. This is a particularly good thread as it discusses the current state and limitations: http://apache-spark-developers-list.1001551.n3.nabble.com/brainsotrming-Generalization-of-DStream-a-ContinuousRDD-td7349.html -kr, Gerard. On Wed, Jul 16, 2014 at 9:56 AM, Sargun Dhillon sar...@sargun.me wrote: Does anyone here have a way to do Spark Streaming with external timing for windows? Right now, it relies on the wall clock of the driver to determine the amount of time that each batch read lasts. We have a Kafka, and HDFS ingress into our Spark Streaming pipeline where the events are annotated by the timestamps that they happened (in real time) in. We would like to keep our windows based on those timestamps, as opposed to based on the driver time. Does anyone have any ideas how to do this?
Re: can't print DStream after reduce
Yeah. I have been wondering how to check this in the general case, across all deployment modes, but thats a hard problem. Last week I realized that even if we can do it just for local, we can get the biggest bang of the buck. TD On Tue, Jul 15, 2014 at 9:31 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, thanks for creating the issue. It feels like in the last week, more or less half of the questions about Spark Streaming rooted in setting the master to local ;-) Tobias On Wed, Jul 16, 2014 at 11:03 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Aah, right, copied from the wrong browser tab i guess. Thanks! TD On Tue, Jul 15, 2014 at 5:57 PM, Michael Campbell michael.campb...@gmail.com wrote: I think you typo'd the jira id; it should be https://issues.apache.org/jira/browse/SPARK-2475 Check whether #cores #receivers in local mode On Mon, Jul 14, 2014 at 3:57 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The problem is not really for local[1] or local. The problem arises when there are more input streams than there are cores. But I agree, for people who are just beginning to use it by running it locally, there should be a check addressing this. I made a JIRA for this. https://issues.apache.org/jira/browse/SPARK-2464 TD On Sun, Jul 13, 2014 at 4:26 PM, Sean Owen so...@cloudera.com wrote: How about a PR that rejects a context configured for local or local[1]? As I understand it is not intended to work and has bitten several people. On Jul 14, 2014 12:24 AM, Michael Campbell michael.campb...@gmail.com wrote: This almost had me not using Spark; I couldn't get any output. It is not at all obvious what's going on here to the layman (and to the best of my knowledge, not documented anywhere), but now you know you'll be able to answer this question for the numerous people that will also have it. On Sun, Jul 13, 2014 at 5:24 PM, Walrus theCat walrusthe...@gmail.com wrote: Great success! I was able to get output to the driver console by changing the construction of the Streaming Spark Context from: val ssc = new StreamingContext(local /**TODO change once a cluster is up **/, AppName, Seconds(1)) to: val ssc = new StreamingContext(local[2] /**TODO change once a cluster is up **/, AppName, Seconds(1)) I found something that tipped me off that this might work by digging through this mailing list. On Sun, Jul 13, 2014 at 2:00 PM, Walrus theCat walrusthe...@gmail.com wrote: More strange behavior: lines.foreachRDD(x = println(x.first)) // works lines.foreachRDD(x = println((x.count,x.first))) // no output is printed to driver console On Sun, Jul 13, 2014 at 11:47 AM, Walrus theCat walrusthe...@gmail.com wrote: Thanks for your interest. lines.foreachRDD(x = println(x.count)) And I got 0 every once in a while (which I think is strange, because lines.print prints the input I'm giving it over the socket.) When I tried: lines.map(_-1).reduceByKey(_+_).foreachRDD(x = println(x.count)) I got no count. Thanks On Sun, Jul 13, 2014 at 11:34 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Try doing DStream.foreachRDD and then printing the RDD count and further inspecting the RDD. On Jul 13, 2014 1:03 AM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
Re: Multiple streams at the same time
I hope it all works :) On Wed, Jul 16, 2014 at 9:08 AM, gorenuru goren...@gmail.com wrote: Hi and thank you for your reply. Looks like it's possible. It looks like a hack for me because we are specifying batch duration when creating context. This means that if we will specify batch duration to 10 seconds, our time windows should be at least 10 seconds long or we will not get results in an adequate time. From other hand, specifying batch duration to 1 second and creating time windows with duration = batch duration will work. Also, it covers almost all our needs (i hope so :)) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-streams-at-the-same-time-tp9819p9936.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
I think I know what the problem is. Spark Streaming is constantly doing garbage cleanup by throwing away data that it does not based on the operations in the DStream. Here the DSTream operations are not aware of the spark sql queries thats happening asynchronous to spark streaming. So data is being cleared before the sql queries are completing, hence the block-not-found error. There is a easy fix. You can call streamingContext.remember() to specify how long to keep all the data around. If you keep that sufficiently long, longer than what the sql queries may require to run, then things should run fine. Let me know if this helps. TD On Wed, Jul 16, 2014 at 9:50 AM, Yin Huai yh...@databricks.com wrote: Hi Srinivas, Seems the query you used is val results =sqlContext.sql(select type from table1). However, table1 does not have a field called type. The schema of table1 is defined as the class definition of your case class Record (i.e. ID, name, score, and school are fields of your table1). Can you change your query and see if your program works? Thanks, Yin On Wed, Jul 16, 2014 at 8:25 AM, srinivas kusamsrini...@gmail.com wrote: Hi TD, I Defines the Case Class outside the main method and was able to compile the code successfully. But getting a run time error when trying to process some json file from kafka. here is the code i an to compile import java.util.Properties import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkConf import scala.util.parsing.json.JSON import org.apache.spark.sql.SQLContext import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ case class Record(ID:String,name:String,score:String,school:String) object KafkaWordCount { def main(args: Array[String]) { if (args.length 4) { System.err.println(Usage: KafkaWordCount zkQuorum group topics numThreads) System.exit(1) } //StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(10)) val sql = new SparkContext(sparkConf) val sqlContext = new SQLContext(sql) val timer = Time(1) // ssc.checkpoint(checkpoint) import sqlContext._ val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]]) val fields = jsonf.map(data=Record(data(ID).toString,data(name).toString,data(score).toString,data(school).toString)) fields.print() val results = fields.foreachRDD((recrdd,tt) = { recrdd.registerAsTable(table1) val results =sqlContext.sql(select type from table1) println(results) results.foreach(println) results.map(t = Type: +t(0)).collect().foreach(println) }) //results.print() ssc.start() ssc.awaitTermination() } } and here is the error i am getting when trying to process some data == Query Plan == Project ['type] ExistingRdd [ID#60,name#61,score#62,school#63], MapPartitionsRDD[111] at mapPartitions at basicOperators.scala:174) 14/07/16 14:34:10 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 14/07/16 14:34:10 INFO TaskSetManager: Starting task 1.0:0 as TID 1 on executor localhost: localhost (PROCESS_LOCAL) 14/07/16 14:34:10 INFO TaskSetManager: Serialized task 1.0:0 as 2710 bytes in 0 ms 14/07/16 14:34:10 INFO Executor: Running task ID 1 14/07/16 14:34:10 ERROR Executor: Exception in task ID 1 java.lang.Exception: Could not compute split, block input-0-1405521243800 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at
Re: ClassNotFoundException: $line11.$read$ when loading an HDFS text file with SparkQL in spark-shell
Hi Michael, Thanks for your reply. Yes, the reduce triggered the actual execution, I got a total length (totalLength: 95068762, for the record). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFoundException-line11-read-when-loading-an-HDFS-text-file-with-SparkQL-in-spark-shell-tp9954p9984.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Release date for new pyspark
Dear List, The version of pyspark on master has a lot of nice new features, e.g. SequenceFile reading, pickle i/o, etc: https://github.com/apache/spark/blob/master/python/pyspark/context.py#L353 I downloaded the recent 1.0.1 release and was surprised to see the distribution did not include these changes in master. (I've tried pulling master [ 9c249743ea ] and compiling from source, but I get a build failure in TestSQLContext.scala FWIW). Is an updated pyspark scheduled for the next release? (Also, am I wrong in expecting HEAD on master should probably compile and run?) Best Regards, -Paul Wais
Re: ClassNotFoundException: $line11.$read$ when loading an HDFS text file with SparkQL in spark-shell
H, it could be some weirdness with classloaders / Mesos / spark sql? I'm curious if you would hit an error if there were no lambda functions involved. Perhaps if you load the data using jsonFile or parquetFile. Either way, I'd file a JIRA. Thanks! On Jul 16, 2014 6:48 PM, Svend svend.vanderve...@gmail.com wrote: Hi Michael, Thanks for your reply. Yes, the reduce triggered the actual execution, I got a total length (totalLength: 95068762, for the record). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFoundException-line11-read-when-loading-an-HDFS-text-file-with-SparkQL-in-spark-shell-tp9954p9984.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Possible bug in ClientBase.scala?
Hi Ron, I just checked and this bug is fixed in recent releases of Spark. -Sandy On Sun, Jul 13, 2014 at 8:15 PM, Chester Chen ches...@alpinenow.com wrote: Ron, Which distribution and Version of Hadoop are you using ? I just looked at CDH5 ( hadoop-mapreduce-client-core- 2.3.0-cdh5.0.0), MRJobConfig does have the field : java.lang.String DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH; Chester On Sun, Jul 13, 2014 at 6:49 PM, Ron Gonzalez zlgonza...@yahoo.com wrote: Hi, I was doing programmatic submission of Spark yarn jobs and I saw code in ClientBase.getDefaultYarnApplicationClasspath(): val field = classOf[MRJobConfig].getField(DEFAULT_YARN_APPLICATION_CLASSPATH) MRJobConfig doesn't have this field so the created launch env is incomplete. Workaround is to set yarn.application.classpath with the value from YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH. This results in having the spark job hang if the submission config is different from the default config. For example, if my resource manager port is 8050 instead of 8030, then the spark app is not able to register itself and stays in ACCEPTED state. I can easily fix this by changing this to YarnConfiguration instead of MRJobConfig but was wondering what the steps are for submitting a fix. Thanks, Ron Sent from my iPhone
Re: Release date for new pyspark
You should expect master to compile and run: patches aren't merged unless they build and pass tests on Jenkins. You shouldn't expect new features to be added to stable code in maintenance releases (e.g. 1.0.1). AFAIK, we're still on track with Spark 1.1.0 development, which means that it should be released sometime in the second half of next month (or shortly thereafter). On Wed, Jul 16, 2014 at 4:03 PM, Paul Wais pw...@yelp.com wrote: Dear List, The version of pyspark on master has a lot of nice new features, e.g. SequenceFile reading, pickle i/o, etc: https://github.com/apache/spark/blob/master/python/pyspark/context.py#L353 I downloaded the recent 1.0.1 release and was surprised to see the distribution did not include these changes in master. (I've tried pulling master [ 9c249743ea ] and compiling from source, but I get a build failure in TestSQLContext.scala FWIW). Is an updated pyspark scheduled for the next release? (Also, am I wrong in expecting HEAD on master should probably compile and run?) Best Regards, -Paul Wais
Re: Cassandra driver Spark question
Tnks to both for the comments and the debugging suggestion, I will try to use. Regarding you comment, yes I do agree the current solution was not efficient but for using the saveToCassandra method I need an RDD thus the paralelize method. I finally got direct by Piotr to use the CassandraConnect and got this fixed in the meantime. Bottom line is I started using the new Cassandra Spark driver with async calls, prepared statements and batch executions on the node transformation and performance improved greatly. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-driver-Spark-question-tp9177p9990.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Memory compute-intensive tasks
Matei - I tried using coalesce(numNodes, true), but it then seemed to run too few SNAP tasks - only 2 or 3 when I had specified 46. The job failed, perhaps for unrelated reasons, with some odd exceptions in the log (at the end of this message). But I really don't want to force data movement between nodes. The input data is in HDFS and should already be somewhat balanced among the nodes. We've run this scenario using the simple hadoop jar runner and a custom format jar to break the input into 8-line chunks (paired FASTQ). Ideally I'd like Spark to do the minimum data movement to balance the work, feeding each task mostly from data local to that node. Daniel - that's a good thought, I could invoke a small stub for each task that talks to a single local demon process over a socket, and serializes all the tasks on a given machine. Thanks, Ravi P.S. Log exceptions: 14/07/15 17:02:00 WARN yarn.ApplicationMaster: Unable to retrieve SparkContext in spite of waiting for 10, maxNumTries = 10 Exception in thread main java.lang.NullPointerException at org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:233) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:110) ...and later... 14/07/15 17:11:07 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM 14/07/15 17:11:07 INFO yarn.ApplicationMaster: AppMaster received a signal. 14/07/15 17:11:07 WARN rdd.NewHadoopRDD: Exception in RecordReader.close() java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:707) at org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:619) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p9991.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Release date for new pyspark
Yeah, we try to have a regular 3 month release cycle; see https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage for the current window. Matei On Jul 16, 2014, at 4:21 PM, Mark Hamstra m...@clearstorydata.com wrote: You should expect master to compile and run: patches aren't merged unless they build and pass tests on Jenkins. You shouldn't expect new features to be added to stable code in maintenance releases (e.g. 1.0.1). AFAIK, we're still on track with Spark 1.1.0 development, which means that it should be released sometime in the second half of next month (or shortly thereafter). On Wed, Jul 16, 2014 at 4:03 PM, Paul Wais pw...@yelp.com wrote: Dear List, The version of pyspark on master has a lot of nice new features, e.g. SequenceFile reading, pickle i/o, etc: https://github.com/apache/spark/blob/master/python/pyspark/context.py#L353 I downloaded the recent 1.0.1 release and was surprised to see the distribution did not include these changes in master. (I've tried pulling master [ 9c249743ea ] and compiling from source, but I get a build failure in TestSQLContext.scala FWIW). Is an updated pyspark scheduled for the next release? (Also, am I wrong in expecting HEAD on master should probably compile and run?) Best Regards, -Paul Wais
Re: Memory compute-intensive tasks
Hi Ravi, I have seen a similar issue before. You can try to set fs.hdfs.impl.disable.cache to true in your hadoop configuration. For example, suppose your hadoop configuration file is hadoopConf, you can use hadoopConf.setBoolean(fs.hdfs.impl.disable.cache, true) Let me know if that helps. Best, Liquan On Wed, Jul 16, 2014 at 4:56 PM, rpandya r...@iecommerce.com wrote: Matei - I tried using coalesce(numNodes, true), but it then seemed to run too few SNAP tasks - only 2 or 3 when I had specified 46. The job failed, perhaps for unrelated reasons, with some odd exceptions in the log (at the end of this message). But I really don't want to force data movement between nodes. The input data is in HDFS and should already be somewhat balanced among the nodes. We've run this scenario using the simple hadoop jar runner and a custom format jar to break the input into 8-line chunks (paired FASTQ). Ideally I'd like Spark to do the minimum data movement to balance the work, feeding each task mostly from data local to that node. Daniel - that's a good thought, I could invoke a small stub for each task that talks to a single local demon process over a socket, and serializes all the tasks on a given machine. Thanks, Ravi P.S. Log exceptions: 14/07/15 17:02:00 WARN yarn.ApplicationMaster: Unable to retrieve SparkContext in spite of waiting for 10, maxNumTries = 10 Exception in thread main java.lang.NullPointerException at org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:233) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:110) ...and later... 14/07/15 17:11:07 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM 14/07/15 17:11:07 INFO yarn.ApplicationMaster: AppMaster received a signal. 14/07/15 17:11:07 WARN rdd.NewHadoopRDD: Exception in RecordReader.close() java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:707) at org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:619) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p9991.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Liquan Pei Department of Physics University of Massachusetts Amherst
Spark Streaming timestamps
Hi all, I am currently using Spark Streaming to conduct a real-time data analytics. We receive data from Kafka. We want to generate output files that contain results that are based on the data we receive from a specific time interval. I have several questions on Spark Streaming's timestamp: 1) If I use saveAsTextFiles, it seems Spark streaming will generate files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time), etc. Does this mean the results are based on the data from 5:00:01 to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the files are generated? 2) If I do not use saveAsTextFiles, how do I get the exact time interval of the RDD when I use foreachRDD to do custom output of the results? 3) How can we specify the starting time of the batches? Thanks! Bill
spark-ec2 script with Tachyon
Hi, It seems that spark-ec2 script deploys Tachyon module along with other setup. I am trying to use .persist(OFF_HEAP) for RDD persistence, but on worker I see this error -- Failed to connect (2) to master localhost/127.0.0.1:19998 : java.net.ConnectException: Connection refused -- From netstat I see that worker is connected to master node on port 19998 -- Proto Recv-Q Send-Q Local Address Foreign Address State tcp0 0 ip-10-16-132-190.ec2.:49239 ip-10-158-45-248.ec2.:19998 ESTABLISHED -- Does Tachyon on EC work out of the box? or does it requite further configuration ? Am I supposed to set spark.tachyonStore.url to Masters IP ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-script-with-Tachyon-tp9996.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: using multiple dstreams together (spark streaming)
Have you taken a look at DStream.transformWith( ... ) . That allows you apply arbitrary transformation between RDDs (of the same timestamp) of two different streams. So you can do something like this. 2s-window-stream.transformWith(1s-window-stream, (rdd1: RDD[...], rdd2: RDD[...]) = { ... // return a new RDD }) And streamingContext.transform() extends it to N DStreams. :) Hope this helps! TD On Wed, Jul 16, 2014 at 10:42 AM, Walrus theCat walrusthe...@gmail.com wrote: hey at least it's something (thanks!) ... not sure what i'm going to do if i can't find a solution (other than not use spark) as i really need these capabilities. anyone got anything else? On Wed, Jul 16, 2014 at 10:34 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: hum... maybe consuming all streams at the same time with an actor that would act as a new DStream source... but this is just a random idea... I don't really know if that would be a good idea or even possible. 2014-07-16 18:30 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Yeah -- I tried the .union operation and it didn't work for that reason. Surely there has to be a way to do this, as I imagine this is a commonly desired goal in streaming applications? On Wed, Jul 16, 2014 at 10:10 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I'm joining several kafka dstreams using the join operation but you have the limitation that the duration of the batch has to be same,i.e. 1 second window for all dstreams... so it would not work for you. 2014-07-16 18:08 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Hi, My application has multiple dstreams on the same inputstream: dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window I want to write logic that deals with all three windows (e.g. when the 1 second window differs from the 2 second window by some delta ...) I've found some examples online (there's not much out there!), and I can only see people transforming a single dstream. In conventional spark, we'd do this sort of thing with a cartesian on RDDs. How can I deal with multiple Dstreams at once? Thanks
Use Spark with HBase' HFileOutputFormat
Hi, I want to use Spark with HBase and I'm confused about how to ingest my data using HBase' HFileOutputFormat. It recommends calling configureIncrementalLoad which does the following: - Inspects the table to configure a total order partitioner - Uploads the partitions file to the cluster and adds it to the DistributedCache - Sets the number of reduce tasks to match the current number of regions - Sets the output key/value class to match HFileOutputFormat2's requirements - Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or PutSortReducer) But in Spark, it seems I have to do the sorting and partition myself, right? Can anyone show me how to do it properly? Is there a better way to ingest data fast to HBase from Spark? Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Spark Streaming timestamps
Answers inline. On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am currently using Spark Streaming to conduct a real-time data analytics. We receive data from Kafka. We want to generate output files that contain results that are based on the data we receive from a specific time interval. I have several questions on Spark Streaming's timestamp: 1) If I use saveAsTextFiles, it seems Spark streaming will generate files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time), etc. Does this mean the results are based on the data from 5:00:01 to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the files are generated? File named 5:00:01 contains results from data received between 5:00:00 and 5:00:01 (based on system time of the cluster). 2) If I do not use saveAsTextFiles, how do I get the exact time interval of the RDD when I use foreachRDD to do custom output of the results? There is a version of foreachRDD which allows you specify the function that takes in Time object. 3) How can we specify the starting time of the batches? What do you mean? Batches are timed based on the system time of the cluster. Thanks! Bill
Kmeans
Can anyone explain to me what is difference between kmeans in Mlib and kmeans in examples/src/main/python/kmeans.py? Best Regards ... Amin Mohebbi PhD candidate in Software Engineering at university of Malaysia H#x2F;P : +60 18 2040 017 E-Mail : tp025...@ex.apiit.edu.my amin_...@me.com
Re: Release date for new pyspark
You should try cleaning and then building. We have recently hit a bug in the scala compiler that sometimes causes non-clean builds to fail. On Wed, Jul 16, 2014 at 7:56 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Yeah, we try to have a regular 3 month release cycle; see https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage for the current window. Matei On Jul 16, 2014, at 4:21 PM, Mark Hamstra m...@clearstorydata.com wrote: You should expect master to compile and run: patches aren't merged unless they build and pass tests on Jenkins. You shouldn't expect new features to be added to stable code in maintenance releases (e.g. 1.0.1). AFAIK, we're still on track with Spark 1.1.0 development, which means that it should be released sometime in the second half of next month (or shortly thereafter). On Wed, Jul 16, 2014 at 4:03 PM, Paul Wais pw...@yelp.com wrote: Dear List, The version of pyspark on master has a lot of nice new features, e.g. SequenceFile reading, pickle i/o, etc: https://github.com/apache/spark/blob/master/python/pyspark/context.py#L353 I downloaded the recent 1.0.1 release and was surprised to see the distribution did not include these changes in master. (I've tried pulling master [ 9c249743ea ] and compiling from source, but I get a build failure in TestSQLContext.scala FWIW). Is an updated pyspark scheduled for the next release? (Also, am I wrong in expecting HEAD on master should probably compile and run?) Best Regards, -Paul Wais