Re: Problems with GC and time to execute with different number of executors.
I'm not caching the data. with each iteration I mean,, each 128mb that a executor has to process. The code is pretty simple. final Conversor c = new Conversor(null, null, null, longFields,typeFields); SparkConf conf = new SparkConf().setAppName(Simple Application); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDbyte[] rdd = sc.binaryRecords(path, c.calculaLongBlock()); JavaRDDString rddString = rdd.map(new Functionbyte[], String() { @Override public String call(byte[] arg0) throws Exception { String result = c.parse(arg0).toString(); return result; } }); rddString.saveAsTextFile(url + /output/ + System.currentTimeMillis()+ /); The parse function just takes an array of bytes and applies some transformations like,,, [0..3] an integer, [4...20] an String, [21..27] another String and so on. It's just a test code, I'd like to understand what it's happeing. 2015-02-04 18:57 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com: Hi Guillermo, What exactly do you mean by each iteration? Are you caching data in memory? -Sandy On Wed, Feb 4, 2015 at 5:02 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I execute a job in Spark where I'm processing a file of 80Gb in HDFS. I have 5 slaves: (32cores /256Gb / 7physical disks) x 5 I have been trying many different configurations with YARN. yarn.nodemanager.resource.memory-mb 196Gb yarn.nodemanager.resource.cpu-vcores 24 I have tried to execute the job with different number of executors a memory (1-4g) With 20 executors takes 25s each iteration (128mb) and it never has a really long time waiting because GC. When I execute around 60 executors the process time it's about 45s and some tasks take until one minute because GC. I have no idea why it's calling GC when I execute more executors simultaneously. The another question it's why it takes more time to execute each block. My theory about the this it's because there're only 7 physical disks and it's not the same 5 processes writing than 20. The code is pretty simple, it's just a map function which parse a line and write the output in HDFS. There're a lot of substrings inside of the function what it could cause GC. Any theory about? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Tableau beta connector
okay. So the queries tableau will run on the persisted data will be through SPARK SQL to improve performance and to take advantage of SPARK SQL. Thanks again Denny From: Denny Lee denny.g@gmail.com Sent: Thursday, February 5, 2015 1:27 PM To: Ashutosh Trivedi (MT2013030); İsmail Keskin Cc: user@spark.apache.org Subject: Re: Tableau beta connector The context is that you would create your RDDs and then persist them in Hive. Once in Hive, the data is accessible from the Tableau extract through Spark thrift server. On Wed, Feb 4, 2015 at 23:36 Ashutosh Trivedi (MT2013030) ashutosh.triv...@iiitb.orgmailto:ashutosh.triv...@iiitb.org wrote: Thanks Denny and Ismail. Denny ,I went through your blog, It was great help. I guess tableau beta connector also following the same procedure,you described in blog. I am building the Spark now. Basically what I don't get is, where to put my data so that tableau can extract. So Ismail,its just Spark SQL. No RDDs I think I am getting it now . We use spark for our big data processing and we want processed data (Rdd) into tableau. So we should put our data in hive metastore and tableau will extract it from there using this connector? Correct me if I am wrong. I guess I have to look at how thrift server works. From: Denny Lee denny.g@gmail.commailto:denny.g@gmail.com Sent: Thursday, February 5, 2015 12:20 PM To: İsmail Keskin; Ashutosh Trivedi (MT2013030) Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Tableau beta connector Some quick context behind how Tableau interacts with Spark / Hive can also be found at https://www.concur.com/blog/en-us/connect-tableau-to-sparksql - its for how to connect from Tableau to the thrift server before the official Tableau beta connector but should provide some of the additional context called out. HTH! On Wed Feb 04 2015 at 10:47:23 PM İsmail Keskin ismail.kes...@dilisim.commailto:ismail.kes...@dilisim.com wrote: Tableau connects to Spark Thrift Server via an ODBC driver. So, none of the RDD stuff applies, you just issue SQL queries from Tableau. The table metadata can come from Hive Metastore if you place your hive-site.xml to configuration directory of Spark. On Thu, Feb 5, 2015 at 8:11 AM, ashu ashutosh.triv...@iiitb.orgmailto:ashutosh.triv...@iiitb.org wrote: Hi, I am trying out the tableau beta connector to Spark SQL. I have few basics question: Will this connector be able to fetch the schemaRDDs into tableau. Will all the schemaRDDs be exposed to tableau? Basically I am not getting what tableau will fetch at data-source? Is it existing files in HDFS? RDDs or something else. Question may be naive but I did not get answer anywhere else. Would really appreciate if someone has already tried it, can help me with this. Thanks, Ashutosh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: pyspark - gzip output compression
I'm getting SequenceFile doesn't work with GzipCodec without native-hadoop code! Where to get those libs and where to put it in the spark? Also can I save plain text file (like saveAsTextFile) as gzip? Thanks. On Wed, Feb 4, 2015 at 11:10 PM, Kane Kim kane.ist...@gmail.com wrote: How to save RDD with gzip compression? Thanks.
Re: spark on ec2
I don't see anything that says you must explicitly restart them to load the new settings, but usually there is some sort of signal trapped [or brute force full restart] to get a configuration reload for most daemons. I'd take a guess and use the $SPARK_HOME/sbin/{stop,start}-slaves.sh scripts on your master node and see. ( http://spark.apache.org/docs/1.2.0/spark-standalone.html#cluster-launch-scripts ) I just tested this out on my integration EC2 cluster and got odd results for stopping the workers (no workers found) but the start script... seemed to work. My integration cluster was running and functioning after executing both scripts, but I also didn't make any changes to spark-env either. On Thu Feb 05 2015 at 9:49:49 PM Kane Kim kane.ist...@gmail.com wrote: Hi, I'm trying to change setting as described here: http://spark.apache.org/docs/1.2.0/ec2-scripts.html export SPARK_WORKER_CORES=6 Then I ran ~/spark-ec2/copy-dir /root/spark/conf to distribute to slaves, but without any effect. Do I have to restart workers? How to do that with spark-ec2? Thanks.
Re: spark driver behind firewall
Yes, the driver has to be able to accept incoming connections. All the executors connect back to the driver sending heartbeats, map status, metrics. It is critical and I don't know of a way around it. You could look into using something like the https://github.com/spark-jobserver/spark-jobserver that could run outside the firewall. Then from inside the firewall you can make REST calls to the server. On Thu, Feb 5, 2015 at 5:03 PM, Kane Kim kane.ist...@gmail.com wrote: I submit spark job from machine behind firewall, I can't open any incoming connections to that box, does driver absolutely need to accept incoming connections? Is there any workaround for that case? Thanks.
Re: Reg Job Server
I read somewhere about Gatling. Can that be used to profile Spark jobs? On Fri, Feb 6, 2015 at 10:27 AM, Kostas Sakellis kos...@cloudera.com wrote: Which Spark Job server are you talking about? On Thu, Feb 5, 2015 at 8:28 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, Can Spark Job Server be used for profiling Spark jobs?
Re: Can't access remote Hive table from spark
Not sure spark standalone mode. But on spark-on-yarn, it should work. You can check following link: http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ Thanks. Zhan Zhang On Feb 5, 2015, at 5:02 PM, Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com wrote: Please note that Spark 1.2.0 only support Hive 0.13.1 or 0.12.0, none of other versions are supported. Best, Cheng On 1/25/15 12:18 AM, guxiaobo1982 wrote: Hi, I built and started a single node standalone Spark 1.2.0 cluster along with a single node Hive 0.14.0 instance installed by Ambari 1.17.0. On the Spark and Hive node I can create and query tables inside Hive, and on remote machines I can submit the SparkPi example to the Spark master. But I failed to run the following example code : public class SparkTest { public static void main(String[] args) { String appName= This is a test application; String master=spark://lix1.bh.com:7077; SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf); JavaHiveContext sqlCtx = new org.apache.spark.sql.hive.api.java.JavaHiveContext(sc); //sqlCtx.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)); //sqlCtx.sql(LOAD DATA LOCAL INPATH '/opt/spark/examples/src/main/resources/kv1.txt' INTO TABLE src); // Queries are expressed in HiveQL. ListRow rows = sqlCtx.sql(FROM src SELECT key, value).collect(); System.out.print(I got + rows.size() + rows \r\n); sc.close();} } Exception in thread main org.apache.hadoop.hive.ql.metadata.InvalidTableException: Table not found src at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:980) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$anon$2.orghttp://2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$anon$2.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137) at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at
spark streaming from kafka real time + batch processing in java
I want to write a spark streaming consumer for kafka in java. I want to process the data in real-time as well as store the data in hdfs in year/month/day/hour/ format. I am not sure how to achieve this. Should I write separate kafka consumers, one for writing data to HDFS and one for spark streaming? Also I would like to ask what do people generally do with the result of spark streams after aggregating over it? Is it okay to update a NoSQL DB with aggregated counts per batch interval or is it generally stored in hdfs? Is it possible to store the mini batch data from spark streaming to HDFS in a way that the data is aggregated hourly and put into HDFS in its hour folder. I would not want a lot of small files equal to the mini batches of spark per hour, that would be inefficient for running hadoop jobs later. Is anyone working on the same problem? Any help and comments would be great. Regards Mohit
Re: Reg Job Server
Which Spark Job server are you talking about? On Thu, Feb 5, 2015 at 8:28 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, Can Spark Job Server be used for profiling Spark jobs?
Re: Reg Job Server
When you say profiling, what are you trying to figure out? Why your spark job is slow? Gatling seems to be a load generating framework so I'm not sure how you'd use it (i've never used it before). Spark runs on the JVM so you can use any JVM profiling tools like YourKit. Kostas On Thu, Feb 5, 2015 at 9:03 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: I read somewhere about Gatling. Can that be used to profile Spark jobs? On Fri, Feb 6, 2015 at 10:27 AM, Kostas Sakellis kos...@cloudera.com wrote: Which Spark Job server are you talking about? On Thu, Feb 5, 2015 at 8:28 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, Can Spark Job Server be used for profiling Spark jobs?
Re: Spark stalls or hangs: is this a clue? remote fetches seem to never return?
what's the dump info by jstack? Yours, Xuefeng Wu 吴雪峰 敬上 On 2015年2月6日, at 上午10:20, Michael Albert m_albert...@yahoo.com.INVALID wrote: My apologies for following up my own post, but I thought this might be of interest. I terminated the java process corresponding to executor which had opened the stderr file mentioned below (kill pid). Then my spark job completed without error (it was actually almost finished). Now I am completely confused :-). Thanks! -Mike From: Michael Albert m_albert...@yahoo.com.INVALID To: user@spark.apache.org user@spark.apache.org Sent: Thursday, February 5, 2015 9:04 PM Subject: Spark stalls or hangs: is this a clue? remote fetches seem to never return? Greetings! Again, thanks to all who have given suggestions. I am still trying to diagnose a problem where I have processes than run for one or several hours but intermittently stall or hang. By stall I mean that there is no CPU usage on the workers or the driver, nor network activity, nor do I see disk activity. It just hangs. Using the Application Master to find which workers still had active tasks, I then went to that machine and looked in the user logs. In one of the users log's stderr files, it ends with Started 50 remote fetches Should there be a message saying that the fetch was completed? Any suggestions as to how I might diagnose why the fetch was not completed? Thanks! -Mike Here is the last part of the log: 15/02/06 01:33:46 INFO storage.MemoryStore: ensureFreeSpace(5368) called with curMem=875861, maxMem=2315649024 15/02/06 01:33:46 INFO storage.MemoryStore: Block broadcast_10 stored as values in memory (estimated size 5.2 KB, free 2.2 GB) 15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 5, fetching them 15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-171-0-208.ec2.internal:44124/user/MapOutputTracker#-878402310] 15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 5, fetching them 15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Got the output locations 15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Getting 300 non-empty blocks out of 300 blocks 15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Getting 300 non-empty blocks out of 300 blocks 15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Started 50 remote fetches in 47 ms 15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Started 50 remote fetches in 48 ms It's been like that for half and hour. Thanks! -Mike
Re: how to debug this kind of error, e.g. lost executor?
could you find the shuffle files? or the files were deleted by other processes? Yours, Xuefeng Wu 吴雪峰 敬上 On 2015年2月5日, at 下午11:14, Yifan LI iamyifa...@gmail.com wrote: Hi, I am running a heavy memory/cpu overhead graphx application, I think the memory is sufficient and set RDDs’ StorageLevel using MEMORY_AND_DISK. But I found there were some tasks failed due to following errors: java.io.FileNotFoundException: /data/spark/local/spark-local-20150205151711-9700/09/rdd_3_275 (No files or folders of this type) ExecutorLostFailure (executor 11 lost) So, finally that stage failed: org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /data/spark/local/spark-local-20150205151711-587a/16/shuffle_11_219_0.index Anyone has points? Where I can get more details for this issue? Best, Yifan LI
Re: Reg Job Server
Yes, I want to know, the reason about the job being slow. I will look at YourKit. Can you redirect me to that, some tutorial in how to use? Thank You On Fri, Feb 6, 2015 at 10:44 AM, Kostas Sakellis kos...@cloudera.com wrote: When you say profiling, what are you trying to figure out? Why your spark job is slow? Gatling seems to be a load generating framework so I'm not sure how you'd use it (i've never used it before). Spark runs on the JVM so you can use any JVM profiling tools like YourKit. Kostas On Thu, Feb 5, 2015 at 9:03 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: I read somewhere about Gatling. Can that be used to profile Spark jobs? On Fri, Feb 6, 2015 at 10:27 AM, Kostas Sakellis kos...@cloudera.com wrote: Which Spark Job server are you talking about? On Thu, Feb 5, 2015 at 8:28 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, Can Spark Job Server be used for profiling Spark jobs?
Re: RE: Can't access remote Hive table from spark
Hi, My spark-env.sh has the following entries with respect to classpath: export SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/lib/hive/lib/*:/etc/hive/conf/ -Skanda On Sun, Feb 1, 2015 at 11:45 AM, guxiaobo1982 guxiaobo1...@qq.com wrote: Hi Skanda, How do set up your SPARK_CLASSPATH? I add the following line to my SPARK_HOME/conf/spark-env.sh , and still got the same error. export SPARK_CLASSPATH=${SPARK_CLASSPATH}:/etc/hive/conf -- Original -- *From: * Skanda Prasad;skanda.ganapa...@gmail.com; *Send time:* Monday, Jan 26, 2015 7:41 AM *To:* guxiaobo1...@qq.com; user@spark.apache.org user@spark.apache.org; *Subject: * RE: Can't access remote Hive table from spark This happened to me as well, putting hive-site.xml inside conf doesn't seem to work. Instead I added /etc/hive/conf to SPARK_CLASSPATH and it worked. You can try this approach. -Skanda -- From: guxiaobo1982 guxiaobo1...@qq.com Sent: 25-01-2015 13:50 To: user@spark.apache.org Subject: Can't access remote Hive table from spark Hi, I built and started a single node standalone Spark 1.2.0 cluster along with a single node Hive 0.14.0 instance installed by Ambari 1.17.0. On the Spark and Hive node I can create and query tables inside Hive, and on remote machines I can submit the SparkPi example to the Spark master. But I failed to run the following example code : public class SparkTest { public static void main(String[] args) { String appName= This is a test application; String master=spark://lix1.bh.com:7077; SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf); JavaHiveContext sqlCtx = new org.apache.spark.sql.hive.api.java.JavaHiveContext(sc); //sqlCtx.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)); //sqlCtx.sql(LOAD DATA LOCAL INPATH '/opt/spark/examples/src /main/resources/kv1.txt' INTO TABLE src); // Queries are expressed in HiveQL. ListRow rows = sqlCtx.sql(FROM src SELECT key, value).collect(); System.out.print(I got + rows.size() + rows \r\n); sc.close();} } Exception in thread main org.apache.hadoop.hive.ql.metadata.InvalidTableException: Table not found src at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:980) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation( HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$$anon$2.org $apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation( HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply( Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply( Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation( Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation( HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse( Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse( Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply( TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48 ) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq( ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47 ) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer( TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray( TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown( TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode.transform( TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply( Analyzer.scala:138) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply( Analyzer.scala:137) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(
Re: Get filename in Spark Streaming
Hello, Did you check the following? http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/ http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-td6551.html -- Emre Sevinç On Fri, Feb 6, 2015 at 2:16 AM, Subacini B subac...@gmail.com wrote: Hi All, We have filename with timestamp say ABC_1421893256000.txt and the timestamp needs to be extracted from file name for further processing.Is there a way to get input file name picked up by spark streaming job? Thanks in advance Subacini -- Emre Sevinc
Can we execute create table and load data commands against Hive inside HiveContext?
Hi, I am playing with the following example code: public class SparkTest { public static void main(String[] args){ String appName= This is a test application; String master=spark://lix1.bh.com:7077; SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf); JavaHiveContext sqlCtx = new org.apache.spark.sql.hive.api.java.JavaHiveContext(sc); //sqlCtx.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)); //sqlCtx.sql(LOAD DATA LOCAL INPATH '/opt/spark/examples/src/main/resources/kv1.txt' INTO TABLE src); // Queries are expressed in HiveQL. ListRow rows = sqlCtx.sql(FROM src SELECT key, value).collect(); //ListRow rows = sqlCtx.sql(show tables).collect(); System.out.print(I got + rows.size() + rows \r\n); sc.close(); }} With the create table and load data commands commented out, the query command can be executed successfully, but I come to ClassNotFoundExceptions if these two commands are executed inside HiveContext, even with different error messages, The create table command will cause the following: Exception in thread main org.apache.spark.sql.execution.QueryExecutionException: FAILED: Hive Internal Error: java.lang.ClassNotFoundException(org.apache.hadoop.hive.ql.hooks.ATSHook) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:309) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.api.java.JavaSchemaRDD.init(JavaSchemaRDD.scala:42) at org.apache.spark.sql.hive.api.java.JavaHiveContext.sql(JavaHiveContext.scala:37) at com.blackhorse.SparkTest.main(SparkTest.java:24) [delete Spark temp dirs] DEBUG org.apache.spark.util.Utils - Shutdown hook called [delete Spark local dirs] DEBUG org.apache.spark.storage.DiskBlockManager - Shutdown hook called The load data command will cause the following: Exception in thread main org.apache.spark.sql.execution.QueryExecutionException: FAILED: RuntimeException org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:309) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.api.java.JavaSchemaRDD.init(JavaSchemaRDD.scala:42) at org.apache.spark.sql.hive.api.java.JavaHiveContext.sql(JavaHiveContext.scala:37) at com.blackhorse.SparkTest.main(SparkTest.java:25) [delete Spark local dirs] DEBUG org.apache.spark.storage.DiskBlockManager - Shutdown hook called [delete Spark temp dirs] DEBUG org.apache.spark.util.Utils - Shutdown hook called
Re: Shuffle read/write issue in spark 1.2
Even I observed the same issue. On Fri, Feb 6, 2015 at 12:19 AM, Praveen Garg praveen.g...@guavus.com wrote: Hi, While moving from spark 1.1 to spark 1.2, we are facing an issue where Shuffle read/write has been increased significantly. We also tried running the job by rolling back to spark 1.1 configuration where we set spark.shuffle.manager to hash and spark.shuffle.blockTransferService to nio. It did improve the performance a bit but it was still much worse than spark 1.1. The scenario seems similar to the bug raised sometime back https://issues.apache.org/jira/browse/SPARK-5081. Has anyone come across any similar issue? Please tell us if any configuration change can help. Regards, Praveen
Re: How many stages in my application?
Yes, there is no way right now to know how many stages a job will generate automatically. Like Mark said, RDD#toDebugString will give you some info about the RDD DAG and from that you can determine based on the dependency types (Wide vs. narrow) if there is a stage boundary. On Thu, Feb 5, 2015 at 1:41 AM, Mark Hamstra m...@clearstorydata.com wrote: And the Job page of the web UI will give you an idea of stages completed out of the total number of stages for the job. That same information is also available as JSON. Statically determining how many stages a job logically comprises is one thing, but dynamically determining how many stages remain to be run to complete a job is a surprisingly tricky problem -- take a look at the discussion that went into Josh's Job page PR to get an idea of the issues and subtleties involved: https://github.com/apache/spark/pull/3009 On Thu, Feb 5, 2015 at 1:27 AM, Mark Hamstra m...@clearstorydata.com wrote: RDD#toDebugString will help. On Thu, Feb 5, 2015 at 1:15 AM, Joe Wass jw...@crossref.org wrote: Thanks Akhil and Mark. I can of course count events (assuming I can deduce the shuffle boundaries), but like I said the program isn't simple and I'd have to do this manually every time I change the code. So I rather find a way of doing this automatically if possible. On 4 February 2015 at 19:41, Mark Hamstra m...@clearstorydata.com wrote: But there isn't a 1-1 mapping from operations to stages since multiple operations will be pipelined into a single stage if no shuffle is required. To determine the number of stages in a job you really need to be looking for shuffle boundaries. On Wed, Feb 4, 2015 at 11:27 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can easily understand the flow by looking at the number of operations in your program (like map, groupBy, join etc.), first of all you list out the number of operations happening in your application and then from the webui you will be able to see how many operations have happened so far. Thanks Best Regards On Wed, Feb 4, 2015 at 4:33 PM, Joe Wass jw...@crossref.org wrote: I'm sitting here looking at my application crunching gigabytes of data on a cluster and I have no idea if it's an hour away from completion or a minute. The web UI shows progress through each stage, but not how many stages remaining. How can I work out how many stages my program will take automatically? My application has a slightly interesting DAG (re-use of functions that contain Spark transformations, persistent RDDs). Not that complex, but not 'step 1, step 2, step 3'. I'm guessing that if the driver program runs sequentially sending messages to Spark, then Spark has no knowledge of the structure of the driver program. Therefore it's necessary to execute it on a small test dataset and see how many stages result? When I set spark.eventLog.enabled = true and run on (very small) test data I don't get any stage messages in my STDOUT or in the log file. This is on a `local` instance. Did I miss something obvious? Thanks! Joe
Re: Reg Job Server
https://cwiki.apache.org/confluence/display/SPARK/Profiling+Spark+Applications+Using+YourKit On Thu, Feb 5, 2015 at 9:18 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I want to know, the reason about the job being slow. I will look at YourKit. Can you redirect me to that, some tutorial in how to use? Thank You On Fri, Feb 6, 2015 at 10:44 AM, Kostas Sakellis kos...@cloudera.com wrote: When you say profiling, what are you trying to figure out? Why your spark job is slow? Gatling seems to be a load generating framework so I'm not sure how you'd use it (i've never used it before). Spark runs on the JVM so you can use any JVM profiling tools like YourKit. Kostas On Thu, Feb 5, 2015 at 9:03 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: I read somewhere about Gatling. Can that be used to profile Spark jobs? On Fri, Feb 6, 2015 at 10:27 AM, Kostas Sakellis kos...@cloudera.com wrote: Which Spark Job server are you talking about? On Thu, Feb 5, 2015 at 8:28 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, Can Spark Job Server be used for profiling Spark jobs?
Re: Whether standalone spark support kerberos?
Standalone mode does not support talking to a kerberized HDFS. If you want to talk to a kerberized (secure) HDFS cluster i suggest you use Spark on Yarn. On Wed, Feb 4, 2015 at 2:29 AM, Jander g jande...@gmail.com wrote: Hope someone helps me. Thanks. On Wed, Feb 4, 2015 at 6:14 PM, Jander g jande...@gmail.com wrote: We have a standalone spark cluster for kerberos test. But when reading from hdfs, i get error output: Can't get Master Kerberos principal for use as renewer. So Whether standalone spark support kerberos? can anyone confirm it? or what i missed? Thanks in advance. -- Thanks, Jander -- Thanks, Jander
Re: spark on ec2
Oh yeah, they picked up changes after restart, thanks! On Thu, Feb 5, 2015 at 8:13 PM, Charles Feduke charles.fed...@gmail.com wrote: I don't see anything that says you must explicitly restart them to load the new settings, but usually there is some sort of signal trapped [or brute force full restart] to get a configuration reload for most daemons. I'd take a guess and use the $SPARK_HOME/sbin/{stop,start}-slaves.sh scripts on your master node and see. ( http://spark.apache.org/docs/1.2.0/spark-standalone.html#cluster-launch-scripts ) I just tested this out on my integration EC2 cluster and got odd results for stopping the workers (no workers found) but the start script... seemed to work. My integration cluster was running and functioning after executing both scripts, but I also didn't make any changes to spark-env either. On Thu Feb 05 2015 at 9:49:49 PM Kane Kim kane.ist...@gmail.com wrote: Hi, I'm trying to change setting as described here: http://spark.apache.org/docs/1.2.0/ec2-scripts.html export SPARK_WORKER_CORES=6 Then I ran ~/spark-ec2/copy-dir /root/spark/conf to distribute to slaves, but without any effect. Do I have to restart workers? How to do that with spark-ec2? Thanks.
RE: Error KafkaStream
Hi, If your message is string you will have to Change Encoder and Decoder to StringEncoder , StringDecoder. If your message Is byte[] you can use DefaultEncoder Decoder. Also Don’t forget to add import statements depending on ur encoder and decoder. import kafka.serializer.StringEncoder; import kafka.serializer. StringDecoder; Regards Jishnu Prathap -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Friday, February 06, 2015 6:41 AM To: Eduardo Costa Alfaia; Sean Owen Cc: user@spark.apache.org Subject: RE: Error KafkaStream Hi, I think you should change the `DefaultDecoder` of your type parameter into `StringDecoder`, seems you want to decode the message into String. `DefaultDecoder` is to return Array[Byte], not String, so here class casting will meet error. Thanks Jerry -Original Message- From: Eduardo Costa Alfaia [mailto:e.costaalf...@unibs.it] Sent: Friday, February 6, 2015 12:04 AM To: Sean Owen Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Error KafkaStream I don’t think so Sean. On Feb 5, 2015, at 16:57, Sean Owen so...@cloudera.commailto:so...@cloudera.com wrote: Is SPARK-4905 / https://github.com/apache/spark/pull/4371/files the same issue? On Thu, Feb 5, 2015 at 7:03 AM, Eduardo Costa Alfaia e.costaalf...@unibs.itmailto:e.costaalf...@unibs.it wrote: Hi Guys, I’m getting this error in KafkaWordCount; TaskSetManager: Lost task 0.0 in stage 4095.0 (TID 1281, 10.20.10.234): java.lang.ClassCastException: [B cannot be cast to java.lang.String at org.apache.spark.examples.streaming.KafkaWordCount$$anonfun$4$$anonfu n$apply$1.apply(KafkaWordCount.scala:7 Some idea that could be? Bellow the piece of code val kafkaStream = { val kafkaParams = Map[String, String]( zookeeper.connect - achab3:2181, group.id - mygroup, zookeeper.connect.timeout.ms - 1, kafka.fetch.message.max.bytes - 400, auto.offset.reset - largest) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap //val lines = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMa p, storageLevel = StorageLevel.MEMORY_ONLY_SER).map(_._2) val KafkaDStreams = (1 to numStreams).map {_ = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMap, storageLe vel = StorageLevel.MEMORY_ONLY_SER).map(_._2) } val unifiedStream = ssc.union(KafkaDStreams) unifiedStream.repartition(sparkProcessingParallelism) } Thanks Guys Informativa sulla Privacy: http://www.unibs.it/node/8155 -- Informativa sulla Privacy: http://www.unibs.it/node/8155 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Reg Job Server
I have a single node Spark standalone cluster. Will this also work for my cluster? Thank You On Fri, Feb 6, 2015 at 11:02 AM, Mark Hamstra m...@clearstorydata.com wrote: https://cwiki.apache.org/confluence/display/SPARK/Profiling+Spark+Applications+Using+YourKit On Thu, Feb 5, 2015 at 9:18 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I want to know, the reason about the job being slow. I will look at YourKit. Can you redirect me to that, some tutorial in how to use? Thank You On Fri, Feb 6, 2015 at 10:44 AM, Kostas Sakellis kos...@cloudera.com wrote: When you say profiling, what are you trying to figure out? Why your spark job is slow? Gatling seems to be a load generating framework so I'm not sure how you'd use it (i've never used it before). Spark runs on the JVM so you can use any JVM profiling tools like YourKit. Kostas On Thu, Feb 5, 2015 at 9:03 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: I read somewhere about Gatling. Can that be used to profile Spark jobs? On Fri, Feb 6, 2015 at 10:27 AM, Kostas Sakellis kos...@cloudera.com wrote: Which Spark Job server are you talking about? On Thu, Feb 5, 2015 at 8:28 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, Can Spark Job Server be used for profiling Spark jobs?
Reg Job Server
Hi, Can Spark Job Server be used for profiling Spark jobs?
Spark Metrics Servlet for driver and executor
Hi all, Looking at spark metricsServlet. What is the url exposing driver executor json response? Found master and worker successfully, but can't find url that return json for the other 2 sources. Thanks! Judy
Error KafkaStream
Hi Guys, I’m getting this error in KafkaWordCount; TaskSetManager: Lost task 0.0 in stage 4095.0 (TID 1281, 10.20.10.234): java.lang.ClassCastException: [B cannot be cast to java.lang.String at org.apache.spark.examples.streaming.KafkaWordCount$$anonfun$4$$anonfun$apply$1.apply(KafkaWordCount.scala:7 Some idea that could be? Bellow the piece of code val kafkaStream = { val kafkaParams = Map[String, String]( zookeeper.connect - achab3:2181, group.id - mygroup, zookeeper.connect.timeout.ms - 1, kafka.fetch.message.max.bytes - 400, auto.offset.reset - largest) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap //val lines = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMa p, storageLevel = StorageLevel.MEMORY_ONLY_SER).map(_._2) val KafkaDStreams = (1 to numStreams).map {_ = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMap, storageLe vel = StorageLevel.MEMORY_ONLY_SER).map(_._2) } val unifiedStream = ssc.union(KafkaDStreams) unifiedStream.repartition(sparkProcessingParallelism) } Thanks Guys -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Shuffle write increases in spark 1.2
Hi Kevin, We seem to be facing the same problem as well. Were you able to find anything after that? The ticket does not seem to have progressed anywhere. Regards, Anubhav On 5 January 2015 at 10:37, 정재부 itsjb.j...@samsung.com wrote: Sure, here is a ticket. https://issues.apache.org/jira/browse/SPARK-5081 --- *Original Message* --- *Sender* : Josh Rosenrosenvi...@gmail.com *Date* : 2015-01-05 06:14 (GMT+09:00) *Title* : Re: Shuffle write increases in spark 1.2 If you have a small reproduction for this issue, can you open a ticket at https://issues.apache.org/jira/browse/SPARK ? On December 29, 2014 at 7:10:02 PM, Kevin Jung (itsjb.j...@samsung.com) wrote: Hi all, The size of shuffle write showing in spark web UI is mush different when I execute same spark job on same input data(100GB) in both spark 1.1 and spark 1.2. At the same sortBy stage, the size of shuffle write is 39.7GB in spark 1.1 but 91.0GB in spark 1.2. I set spark.shuffle.manager option to hash because it's default value is changed but spark 1.2 writes larger file than spark 1.1. Can anyone tell me why this happened? Thanks Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading from CSV file with spark-csv_2.10
Hi Florin, I might be wrong but timestamp looks like a keyword in SQL that the engine gets confused with. If it is a column name of your table, you might want to change it. ( https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types) I'm constantly working with CSV files with spark. However, I didn't use the spark-csv package though. I did that manually so I cannot comment on the spark-csv. HTH, Jerry On Thu, Feb 5, 2015 at 9:32 AM, Spico Florin spicoflo...@gmail.com wrote: Hello! I'm using spark-csv 2.10 with Java from the maven repository groupIdcom.databricks/groupId artifactIdspark-csv_2.10/artifactId version0.1.1/version I would like to use Spark-SQL to filter out my data. I'm using the following code: JavaSchemaRDD cars = new JavaCsvParser().withUseHeader(true).csvFile( sqlContext, logFile); cars.registerAsTable(mytable); JavaSchemaRDD doll = sqlContext.sql(SELECT TimeStamp FROM mytable); doll.saveAsTextFile(dolly.csv); but I'm getting the following error: Exception in thread main java.lang.RuntimeException: [1.8] failure: ``UNION'' expected but `TimeStamp' fo SELECT TimeStamp FROM mytablel at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) Can you please tell me what is the best approach to filter the CSV data with SQL? Thank you. Regards, Florin
Re: Error KafkaStream
Is SPARK-4905 / https://github.com/apache/spark/pull/4371/files the same issue? On Thu, Feb 5, 2015 at 7:03 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi Guys, I’m getting this error in KafkaWordCount; TaskSetManager: Lost task 0.0 in stage 4095.0 (TID 1281, 10.20.10.234): java.lang.ClassCastException: [B cannot be cast to java.lang.String at org.apache.spark.examples.streaming.KafkaWordCount$$anonfun$4$$anonfun$apply$1.apply(KafkaWordCount.scala:7 Some idea that could be? Bellow the piece of code val kafkaStream = { val kafkaParams = Map[String, String]( zookeeper.connect - achab3:2181, group.id - mygroup, zookeeper.connect.timeout.ms - 1, kafka.fetch.message.max.bytes - 400, auto.offset.reset - largest) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap //val lines = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMa p, storageLevel = StorageLevel.MEMORY_ONLY_SER).map(_._2) val KafkaDStreams = (1 to numStreams).map {_ = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMap, storageLe vel = StorageLevel.MEMORY_ONLY_SER).map(_._2) } val unifiedStream = ssc.union(KafkaDStreams) unifiedStream.repartition(sparkProcessingParallelism) } Thanks Guys Informativa sulla Privacy: http://www.unibs.it/node/8155 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error KafkaStream
I don’t think so Sean. On Feb 5, 2015, at 16:57, Sean Owen so...@cloudera.com wrote: Is SPARK-4905 / https://github.com/apache/spark/pull/4371/files the same issue? On Thu, Feb 5, 2015 at 7:03 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi Guys, I’m getting this error in KafkaWordCount; TaskSetManager: Lost task 0.0 in stage 4095.0 (TID 1281, 10.20.10.234): java.lang.ClassCastException: [B cannot be cast to java.lang.String at org.apache.spark.examples.streaming.KafkaWordCount$$anonfun$4$$anonfun$apply$1.apply(KafkaWordCount.scala:7 Some idea that could be? Bellow the piece of code val kafkaStream = { val kafkaParams = Map[String, String]( zookeeper.connect - achab3:2181, group.id - mygroup, zookeeper.connect.timeout.ms - 1, kafka.fetch.message.max.bytes - 400, auto.offset.reset - largest) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap //val lines = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMa p, storageLevel = StorageLevel.MEMORY_ONLY_SER).map(_._2) val KafkaDStreams = (1 to numStreams).map {_ = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMap, storageLe vel = StorageLevel.MEMORY_ONLY_SER).map(_._2) } val unifiedStream = ssc.union(KafkaDStreams) unifiedStream.repartition(sparkProcessingParallelism) } Thanks Guys Informativa sulla Privacy: http://www.unibs.it/node/8155 -- Informativa sulla Privacy: http://www.unibs.it/node/8155 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark - gzip output compression
No, you can compress SequenceFile with gzip. If you are reading outside Hadoop then SequenceFile may not be a great choice. You can use the gzip codec with TextOutputFormat if you need to. On Feb 5, 2015 8:28 AM, Kane Kim kane.ist...@gmail.com wrote: I'm getting SequenceFile doesn't work with GzipCodec without native-hadoop code! Where to get those libs and where to put it in the spark? Also can I save plain text file (like saveAsTextFile) as gzip? Thanks. On Wed, Feb 4, 2015 at 11:10 PM, Kane Kim kane.ist...@gmail.com wrote: How to save RDD with gzip compression? Thanks.
Re: Use Spark as multi-threading library and deprecate web UI
Do you mean disable the web UI? spark.ui.enabled=false Sure, it's useful with master = local[*] too. On Thu, Feb 5, 2015 at 9:30 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, It might sounds weird, but I think spark is perfect to be used as a multi-threading library in some cases. The local mode will naturally boost multiple thread when required. Because it is more restrict and less chance to have potential bug in the code (because it is more data oriental, not thread oriental). Of course, it cannot be used for all cases, but in most of my applications, it is enough (90%). I want to hear other people’s idea about this. BTW: if I run spark in local mode, how to deprecate the web UI (default listen on 4040), because I don’t want to start the UI every time if I use spark as a local library. Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ZeroMQ and pyspark.streaming
Does pyspark supports zeroMQ? I see that java does it, but I am not sure for Python? regards -- Aleksandar Kacanski
Re: MLlib - Show an element in RDD[(Int, Iterable[Array[Double]])]
I solve the question with this code: import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.linalg.Vectors val data = sc.textFile(/opt/testAppSpark/data/height-weight.txt).map { line = Vectors.dense(line.split(' ').map(_.toDouble)) }.cache() val cluster = KMeans.train(data, 3, 20) val vectorsAndClusterIdx = data.map{ point = val prediction = cluster.predict(point) (point.toString, prediction) } vectorsAndClusterIdx.collect -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Show-an-element-in-RDD-Int-Iterable-Array-Double-tp21521p21522.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark job ends abruptly during setup without error message
I'm submitting this on a cluster, with my usual setting of, export YARN_CONF_DIR=/etc/hadoop/conf It is working again after a small change to the code so I will see if I can reproduce the error (later today). On Thu, Feb 5, 2015 at 9:17 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Are you submitting the job from your local machine or on the driver machine.? Have you set YARN_CONF_DIR. On Thu, Feb 5, 2015 at 10:43 PM, Arun Luthra arun.lut...@gmail.com wrote: While a spark-submit job is setting up, the yarnAppState goes into Running mode, then I get a flurry of typical looking INFO-level messages such as INFO BlockManagerMasterActor: ... INFO YarnClientSchedulerBackend: Registered executor: ... Then, spark-submit quits without any error message and I'm back at the command line. What could be causing this? Arun -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
My first experience with Spark
I am evaluating Spark for our production usage. Our production cluster is Hadoop 2.2.0 without Yarn. So I want to test Spark with Standalone deployment running with Hadoop. What I have in mind is to test a very complex Hive query, which joins between 6 tables, lots of nested structure with exploding, and currently takes 8 hours daily running in our production. All the data of this query are in AVRO + Snappy. I setup one Box (24 core + 64G memory), installed the same version of Hadoop as our production, and put 5% of data on it (which is about 60G, snappy compressed AVRO files) I am running the same query in Hive. It took 6 rounds of MR jobs, finished around 30 hours on this one box. Now, I start to have fun with Spark. I checked out Spark 1.2.0, built it following Spark build instructions, and installed on this one box. Since the test data is all in AVRO format, so I also built the latest development version of SparkAvro, from https://github.com/databricks/spark-avro 1) First, I got some problems to use the AVRO data in spark-avro. It turns our that Spark 1.2.0 build processing will merge the mismatched version of AVRO core and AVRO mapred jars. I manually fixed it. See issue here: https://github.com/databricks/spark-avro/issues/242) After that, I am impressed becauseThe AVRO file just works from HDFS to Spark 1.2The complex query (about 200 lines) just starts to run in Spark 1.2 using org.apache.spark.sql.hive.HiveContext without any problem. This HiveContext just works in Spark SQL 1.2. Very nice.3) I got several OOM, which is reasonable. I finally changes the memory setting to: export SPARK_WORKER_MEMORY=8gexport SPARK_DRIVER_MEMORY=2gexport SPARK_EXECUTOR_MEMORY=8g As 4g just doesn't work for the test data volume. After I set to 8G, the job won't fail due to OOM. 4) It looks like Spark generates 8 stages for the big query. It finishes the stage 1 and stage 2, then failed on stage 3 twice with the following error: FetchFailed(null, shuffleId=7, mapId=-1, reduceId=7, message=org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 7at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) ) During whole test, the CPUs load average is about 16, and still
word2vec more distributed
I was wondering if there was any chance of getting a more distributed word2vec implementation. I seem to be running out of memory from big local data structures such as val syn1Global = new Array[Float](vocabSize * vectorSize) Is there anyway chance of getting a version where these are all put in RDDs? Thanks,
Errors in the workers machines
Hello! I received the following errors in the workerLog.log files: ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@stream4:33660] - [akka.tcp://sparkExecutor@stream4:47929]: Error [Association failed with [akka.tcp://sparkExecutor@stream4:47929]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@stream4:47929] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: stream4/x.x.x.x:47929 ] (For security reason have masked the IP with x.x.x.x). The same errors occurs for different ports (42395,39761). Even though I have these errors the application is finished with success. I have the following questions: 1. For what reasons is using Spark the above ports? What internal component is triggering them? 2. How I can get rid of these errors? 3. Why the application is still finished with success? 4. Why is trying with more ports? I look forward for your answers. Regards. Florin
Re: Zipping RDDs of equal size not possible
Hi Xiangrui, I'm sorry. I didn't recognize your mail. What I did is a workaround only working for my special case. It does not scale and only works for small data sets but that is fine for me so far. Kind Regards, Niklas def securlyZipRdds[A, B: ClassTag](rdd1: RDD[A], rdd2: RDD[B]): RDD[(A, B)] = { val rdd1Repartitioned = rdd1.repartition(1) val rdd2Repartitioned = rdd2.repartition(1) val (rdd1Balanced, rdd2Balanced) = balanceRddSizes(rdd1Repartitioned, rdd2Repartitioned) rdd1Balanced.zip(rdd2Balanced) } def balanceRddSizes[A, B](rdd1: RDD[A], rdd2: RDD[B]): (RDD[A], RDD[B]) = { val rdd1count = rdd1.count() val rdd2count = rdd2.count() val difference = math.abs(rdd1count - rdd2count).toInt if (rdd1count rdd2count) { (removeRandomElements(rdd1, difference), rdd2) } else if (rdd2count rdd1count) { (rdd1, removeRandomElements(rdd2, difference)) } else { (rdd1, rdd2) } } def removeRandomElements[A](rdd: RDD[A], numberOfElements: Int): RDD[A] = { val sample: Array[A] = rdd.takeSample(false, numberOfElements) val set: Set[A] = Set(sample: _*) rdd.filter(x = if (set.contains(x)) false else true) } On 10.01.2015 06:56, Xiangrui Meng wrote: sample 2 * n tuples, split them into two parts, balance the sizes of these parts by filtering some tuples out How do you guarantee that the two RDDs have the same size? -Xiangrui On Fri, Jan 9, 2015 at 3:40 AM, Niklas Wilcke 1wil...@informatik.uni-hamburg.de wrote: Hi Spark community, I have a problem with zipping two RDDs of the same size and same number of partitions. The error message says that zipping is only allowed on RDDs which are partitioned into chunks of exactly the same sizes. How can I assure this? My workaround at the moment is to repartition both RDDs to only one partition but that obviously does not scale. This problem originates from my problem to draw n random tuple pairs (Tuple, Tuple) from an RDD[Tuple]. What I do is to sample 2 * n tuples, split them into two parts, balance the sizes of these parts by filtering some tuples out and zipping them together. I would appreciate to read better approaches for both problems. Thanks in advance, Niklas - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to design a long live spark application
Hi All, I want to develop a server side application: User submit request à Server run spark application and return (this might take a few seconds). So I want to host the server to keep the long-live context, I dont know whether this is reasonable or not. Basically I try to have a global JavaSparkContext instance and keep it there, and initialize some RDD. Then my java application will use it to submit the job. So now I have some questions: 1, if I dont close it, will there any timeout I need to configure on the spark server? 2, In theory I want to design something similar to Spark shell (which also host a default sc there), just it is not shell based. Any suggestion? I think my request is very common for application development, here must someone has done it before? Regards, Shawn
Reading from CSV file with spark-csv_2.10
Hello! I'm using spark-csv 2.10 with Java from the maven repository groupIdcom.databricks/groupId artifactIdspark-csv_2.10/artifactId version0.1.1/version I would like to use Spark-SQL to filter out my data. I'm using the following code: JavaSchemaRDD cars = new JavaCsvParser().withUseHeader(true).csvFile( sqlContext, logFile); cars.registerAsTable(mytable); JavaSchemaRDD doll = sqlContext.sql(SELECT TimeStamp FROM mytable); doll.saveAsTextFile(dolly.csv); but I'm getting the following error: Exception in thread main java.lang.RuntimeException: [1.8] failure: ``UNION'' expected but `TimeStamp' fo SELECT TimeStamp FROM mytablel at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) Can you please tell me what is the best approach to filter the CSV data with SQL? Thank you. Regards, Florin
Resources not uploaded when submitting job in yarn-client mode
Hi, I am trying to submit a job from a Windows system to a YARN cluster running on Linux (the HDP2.2 sandbox). I have copied the relevant Hadoop directories as well as the yarn-site.xml and mapred-site.xml to the Windows file system. Further, I have added winutils.exe to $HADOOP_HOME/bin. I can tell that the ApplicationMaster is properly created on YARN (it's visible in the ResourceManager UI) but the job fails with the following error: Diagnostics: File file:/D:/tools/spark-1.2.0-bin-hadoop2.4/lib/spark-assembly-1.2.0-hadoop2.4.0.jar does not exist java.io.FileNotFoundException: File file:/D:/tools/spark-1.2.0-bin-hadoop2.4/lib/spark-assembly-1.2.0-hadoop2.4.0.jar does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:534) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747) When I try to run the same job from the cluster itself, it works fine. When I tried this, I noticed a difference: on the cluster one log entry was yarn.Client: Uploading resource... but on the Windows machine it was Client: Source and destination file systems are the same. Not copying . Looking at the source code (in org.apache.spark.deploy.yarn.Client) I can see that this happens because the client is led to believe that my Windows machine as well as the destination (the cluster running in a VM) use the same file system. Clearly this is not the case and so the above error message is not surprising. But I can't figure out how to adjust the configuration to get this to work. Also I'm surprised that when doing the same thing on the cluster, the Client uploads the resource (because in that case it really is on the same FS). Is there something in the mapred-site.xml or yarn-site.xml files that I need to adjust on my Windows machine? What am I missing? Thanks, Stefan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Resources-not-uploaded-when-submitting-job-in-yarn-client-mode-tp21516.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to debug this kind of error, e.g. lost executor?
Hi, I am running a heavy memory/cpu overhead graphx application, I think the memory is sufficient and set RDDs’ StorageLevel using MEMORY_AND_DISK. But I found there were some tasks failed due to following errors: java.io.FileNotFoundException: /data/spark/local/spark-local-20150205151711-9700/09/rdd_3_275 (No files or folders of this type) ExecutorLostFailure (executor 11 lost) So, finally that stage failed: org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /data/spark/local/spark-local-20150205151711-587a/16/shuffle_11_219_0.index Anyone has points? Where I can get more details for this issue? Best, Yifan LI
Use Spark as multi-threading library and deprecate web UI
Hi All, It might sounds weird, but I think spark is perfect to be used as a multi-threading library in some cases. The local mode will naturally boost multiple thread when required. Because it is more restrict and less chance to have potential bug in the code (because it is more data oriental, not thread oriental). Of course, it cannot be used for all cases, but in most of my applications, it is enough (90%). I want to hear other people's idea about this. BTW: if I run spark in local mode, how to deprecate the web UI (default listen on 4040), because I don't want to start the UI every time if I use spark as a local library. Regards, Shuai
streaming joining multiple streams
The challenge I have is this. There's two streams of data where an event might look like this in stream1: (time, hashkey, foo1) and in stream2: (time, hashkey, foo2) The result after joining should be (time, hashkey, foo1, foo2) .. The join happens on hashkey and the time difference can be ~30 mins between events. The amount of data is enormous .. hundreds of billions of events per month. I need not only join the existing history data but continue to do so with incoming data (comes in batches not really streamed) For now I was thinking to implement this in MapReduce and sliding windows .. I'm wondering if spark can actually help me with this sort of challenge? How would a join of two huge streams of historic data would actually happen internally within spark and would it be more efficient than let's say hive map reduce stream join of two big tables? I also saw spark streaming has windowing support but it seems you cannot provide your own timer? As in I cannot make the time be derived from events itself rather than having an actual clock running. Thanks,
K-Means final cluster centers
Hi, I am trying to get the final cluster centers after running the KMeans algorithm in MLlib in order to characterize the clusters. But the KMeansModel does not have any public method to retrieve this info. There appears to be only a private method called clusterCentersWithNorm. I guess I could call predict() to get the final cluster assignment for the dataset and write my own code to compute the means based on this final assignment. But I would like to know if there is a way to get this info from MLLib API directly after running KMeans? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-Means-final-cluster-centers-tp21523.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: K-Means final cluster centers
Unless I misunderstood your question, you’re looking for the val clusterCenters in http://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.mllib.clustering.KMeansModel, no? Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Feb 5, 2015, at 2:35 PM, SK skrishna...@gmail.com wrote: Hi, I am trying to get the final cluster centers after running the KMeans algorithm in MLlib in order to characterize the clusters. But the KMeansModel does not have any public method to retrieve this info. There appears to be only a private method called clusterCentersWithNorm. I guess I could call predict() to get the final cluster assignment for the dataset and write my own code to compute the means based on this final assignment. But I would like to know if there is a way to get this info from MLLib API directly after running KMeans? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-Means-final-cluster-centers-tp21523.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to design a long live spark application
Yes you can submit multiple actions from different threads to the same SparkContext. It is safe. Indeed what you want to achieve is quite common. Expose some operations over a SparkContext through HTTP. I have used spray for this and it just worked fine. At bootstrap of your web app, start a sparkcontext, maybe preprocess some data and cache it, then start accepting requests against this sc. Depending where you place the initialization code, you can block the server from initializing until your context is ready. This is nice if you don't want to accept requests while the context is being prepared. Eugen 2015-02-05 23:22 GMT+01:00 Shuai Zheng szheng.c...@gmail.com: This example helps a lot J But I am thinking a below case: Assume I have a SparkContext as a global variable. Then if I use multiple threads to access/use it. Will it mess up? For example: My code: *public* *static* ListTuple2Integer, Double run(JavaSparkContext sparkContext, MapInteger, ListExposureInfo cache, Properties prop, ListEghInfo el) *throws* IOException, InterruptedException { JavaRDDEghInfo lines = sparkContext.parallelize(el, 100); Lines.map(…) … Lines.count() } If I have two threads call this method at the same time and pass in the same SparkContext. Will SparkContext be thread-safe? I am a bit worry here, in traditional java, it should be, but in Spark context, I am not 100% sure. Basically the sparkContext need to smart enough to differentiate the different method context (RDD add to it from different methods), so create two different DAG for different method. Anyone can confirm this? This is not something I can easily test with code. Thanks! Regards, Shuai *From:* Corey Nolet [mailto:cjno...@gmail.com] *Sent:* Thursday, February 05, 2015 11:55 AM *To:* Charles Feduke *Cc:* Shuai Zheng; user@spark.apache.org *Subject:* Re: How to design a long live spark application Here's another lightweight example of running a SparkContext in a common java servlet container: https://github.com/calrissian/spark-jetty-server On Thu, Feb 5, 2015 at 11:46 AM, Charles Feduke charles.fed...@gmail.com wrote: If you want to design something like Spark shell have a look at: http://zeppelin-project.org/ Its open source and may already do what you need. If not, its source code will be helpful in answering the questions about how to integrate with long running jobs that you have. On Thu Feb 05 2015 at 11:42:56 AM Boromir Widas vcsub...@gmail.com wrote: You can check out https://github.com/spark-jobserver/spark-jobserver - this allows several users to upload their jars and run jobs with a REST interface. However, if all users are using the same functionality, you can write a simple spray server which will act as the driver and hosts the spark context+RDDs, launched in client mode. On Thu, Feb 5, 2015 at 10:25 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I want to develop a server side application: User submit request à Server run spark application and return (this might take a few seconds). So I want to host the server to keep the long-live context, I don’t know whether this is reasonable or not. Basically I try to have a global JavaSparkContext instance and keep it there, and initialize some RDD. Then my java application will use it to submit the job. So now I have some questions: 1, if I don’t close it, will there any timeout I need to configure on the spark server? 2, In theory I want to design something similar to Spark shell (which also host a default sc there), just it is not shell based. Any suggestion? I think my request is very common for application development, here must someone has done it before? Regards, Shawn
RE: How to design a long live spark application
This example helps a lot J But I am thinking a below case: Assume I have a SparkContext as a global variable. Then if I use multiple threads to access/use it. Will it mess up? For example: My code: public static ListTuple2Integer, Double run(JavaSparkContext sparkContext, MapInteger, ListExposureInfo cache, Properties prop, ListEghInfo el) throws IOException, InterruptedException { JavaRDDEghInfo lines = sparkContext.parallelize(el, 100); Lines.map(…) … Lines.count() } If I have two threads call this method at the same time and pass in the same SparkContext. Will SparkContext be thread-safe? I am a bit worry here, in traditional java, it should be, but in Spark context, I am not 100% sure. Basically the sparkContext need to smart enough to differentiate the different method context (RDD add to it from different methods), so create two different DAG for different method. Anyone can confirm this? This is not something I can easily test with code. Thanks! Regards, Shuai From: Corey Nolet [mailto:cjno...@gmail.com] Sent: Thursday, February 05, 2015 11:55 AM To: Charles Feduke Cc: Shuai Zheng; user@spark.apache.org Subject: Re: How to design a long live spark application Here's another lightweight example of running a SparkContext in a common java servlet container: https://github.com/calrissian/spark-jetty-server On Thu, Feb 5, 2015 at 11:46 AM, Charles Feduke charles.fed...@gmail.com wrote: If you want to design something like Spark shell have a look at: http://zeppelin-project.org/ Its open source and may already do what you need. If not, its source code will be helpful in answering the questions about how to integrate with long running jobs that you have. On Thu Feb 05 2015 at 11:42:56 AM Boromir Widas vcsub...@gmail.com wrote: You can check out https://github.com/spark-jobserver/spark-jobserver - this allows several users to upload their jars and run jobs with a REST interface. However, if all users are using the same functionality, you can write a simple spray server which will act as the driver and hosts the spark context+RDDs, launched in client mode. On Thu, Feb 5, 2015 at 10:25 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I want to develop a server side application: User submit request à Server run spark application and return (this might take a few seconds). So I want to host the server to keep the long-live context, I don’t know whether this is reasonable or not. Basically I try to have a global JavaSparkContext instance and keep it there, and initialize some RDD. Then my java application will use it to submit the job. So now I have some questions: 1, if I don’t close it, will there any timeout I need to configure on the spark server? 2, In theory I want to design something similar to Spark shell (which also host a default sc there), just it is not shell based. Any suggestion? I think my request is very common for application development, here must someone has done it before? Regards, Shawn
RE: My first experience with Spark
Finally I gave up after there are too many failed retry. From the log in the worker side, it looks like failed with JVM OOM, as below: 15/02/05 17:02:03 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Driver Heartbeater,5,main]java.lang.OutOfMemoryError: Java heap spaceat java.lang.StringBuilder.toString(StringBuilder.java:812) at scala.collection.mutable.StringBuilder.toString(StringBuilder.scala:427) at scala.concurrent.duration.FiniteDuration.unitString(Duration.scala:583) at scala.concurrent.duration.FiniteDuration.toString(Duration.scala:584) at java.lang.String.valueOf(String.java:1675)at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107)at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)15/02/05 17:02:03 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[org.apache.hadoop.hdfs.PeerCache@43fe286e,5,main]java.lang.OutOfMemoryError: Java heap spaceat org.spark-project.guava.common.collect.LinkedListMultimap$5.listIterator(LinkedListMultimap.java:912) at java.util.AbstractList.listIterator(AbstractList.java:310)at java.util.AbstractSequentialList.iterator(AbstractSequentialList.java:250) at org.apache.hadoop.hdfs.PeerCache.evictExpired(PeerCache.java:213) at org.apache.hadoop.hdfs.PeerCache.run(PeerCache.java:255)at org.apache.hadoop.hdfs.PeerCache.access$000(PeerCache.java:39)at org.apache.hadoop.hdfs.PeerCache$1.run(PeerCache.java:135)at java.lang.Thread.run(Thread.java:853)15/02/05 17:02:03 ERROR executor.Executor: Exception in task 5.0 in stage 3.2 (TID 2618) Is this due to OOM in the shuffle stage? I already set the SPARK_WORKER_MEMORY=8g, and I can see from the web UI it is 8g. Any configuration that I can change to avoid the above OOM? Thanks Yong From: java8...@hotmail.com To: user@spark.apache.org Subject: My first experience with Spark Date: Thu, 5 Feb 2015 16:03:33 -0500 I am evaluating Spark for our production usage. Our production cluster is Hadoop 2.2.0 without Yarn. So I want to test Spark with Standalone deployment running with Hadoop. What I have in mind is to test a very complex Hive query, which joins between 6 tables, lots of nested structure with exploding, and currently takes 8 hours daily running in our production. All the data of this query are in AVRO + Snappy. I setup one Box (24 core + 64G memory), installed the same version of Hadoop as our production, and put 5% of data on it (which is about 60G, snappy compressed AVRO files) I am running the same query in Hive. It took 6 rounds of MR jobs, finished around 30 hours on this one box. Now, I start to have fun with Spark. I checked out Spark 1.2.0, built it following Spark build instructions, and installed on this one box. Since the test data is all in AVRO format, so I also built the latest development version of SparkAvro, from https://github.com/databricks/spark-avro 1) First, I got some problems to use the AVRO data in spark-avro. It turns our that Spark 1.2.0 build processing will merge the mismatched version of AVRO core and AVRO mapred jars. I manually fixed it. See issue here: https://github.com/databricks/spark-avro/issues/242) After that, I am impressed becauseThe AVRO file just works from HDFS to Spark 1.2The complex query (about 200 lines) just starts to run in Spark 1.2 using org.apache.spark.sql.hive.HiveContext without any problem. This HiveContext just works in Spark SQL 1.2. Very nice.3) I got several OOM, which is reasonable. I finally changes the memory setting to: export SPARK_WORKER_MEMORY=8gexport SPARK_DRIVER_MEMORY=2gexport SPARK_EXECUTOR_MEMORY=8g As 4g just doesn't work for the test data volume. After I set to 8G, the job won't fail due to OOM. 4) It looks like Spark generates 8 stages for the big query. It finishes the stage 1 and stage 2, then failed on stage 3 twice with the following error: FetchFailed(null, shuffleId=7, mapId=-1, reduceId=7, message=org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 7at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) at
Re: K-Means final cluster centers
There's a kMeansModel.clusterCenters() available if u r looking to get the centers from KMeansModel. From: SK skrishna...@gmail.com To: user@spark.apache.org Sent: Thursday, February 5, 2015 5:35 PM Subject: K-Means final cluster centers Hi, I am trying to get the final cluster centers after running the KMeans algorithm in MLlib in order to characterize the clusters. But the KMeansModel does not have any public method to retrieve this info. There appears to be only a private method called clusterCentersWithNorm. I guess I could call predict() to get the final cluster assignment for the dataset and write my own code to compute the means based on this final assignment. But I would like to know if there is a way to get this info from MLLib API directly after running KMeans? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-Means-final-cluster-centers-tp21523.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to broadcast a variable read from a file in yarn-cluster mode?
I have a file badFullIPs.csv of bad IP addresses used for filtering. In yarn-client mode, I simply read it off the edge node, transform it, and then broadcast it: val badIPs = fromFile(edgeDir + badfullIPs.csv) val badIPsLines = badIPs.getLines val badIpSet = badIPsLines.toSet val badIPsBC = sc.broadcast(badIpSet) badIPs.close How can I accomplish this in yarn-cluster mode? Jon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
StreamingContext getOrCreate with queueStream
I am trying to use the StreamingContext getOrCreate method in my app. I started by running the example ( RecoverableNetworkWordCount https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala ), which worked as expected. However, when I modified that example to use /queueStream/ rather than /socketTextStream/ for it's input, then things broke down. I first ran it with an empty checkpoint directory, then restarted the app and got a NPE (copied below). Is this a known limitation of using queueStream? Am I assuming something by using it? Thanks in advance, for any advice! FYI, I changed line 73 in the example to be: -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StreamingContext-getOrCreate-with-queueStream-tp21528.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Best tools for visualizing Spark Streaming data?
Hello Everyone, I wanted to hear the community's thoughts on what (open - source) tools have been used to visualize data from Spark/Spark Streaming? I've taken a look at Zepellin, but had some trouble working with it. Couple questions: 1) I've looked at a couple blog posts and it seems like spark job server is necessary for visualizing data? 2) Is the best way to visualize data to do it through Spark or to write to a database and then visualize it from there? 3) If I'm still a bit vague, ideally what I would like to do is create a real time visualization for the java kafka word count app that's on the spark git hub page. Thanks for the help!
Re: maven doesn't build dependencies with Scala 2.11
Now that Kafka 0.8.2.0 has been released, adding external/kafka module works. FYI On Sun, Jan 18, 2015 at 7:36 PM, Ted Yu yuzhih...@gmail.com wrote: bq. there was no 2.11 Kafka available That's right. Adding external/kafka module resulted in: [ERROR] Failed to execute goal on project spark-streaming-kafka_2.11: Could not resolve dependencies for project org.apache.spark:spark-streaming-kafka_2.11:jar:1.3.0-SNAPSHOT: Could not find artifact org.apache.kafka:kafka_2.11:jar:0.8.0 in central ( https://repo1.maven.org/maven2) - [Help 1] Cheers On Sun, Jan 18, 2015 at 10:41 AM, Sean Owen so...@cloudera.com wrote: I could be wrong, but I thought this was on purpose. At the time it was set up, there was no 2.11 Kafka available? or one of its dependencies wouldn't work with 2.11? But I'm not sure what the OP means by maven doesn't build Spark's dependencies because Ted indicates it does, and of course you can see that these artifacts are published. On Sun, Jan 18, 2015 at 2:46 AM, Ted Yu yuzhih...@gmail.com wrote: There're 3 jars under lib_managed/jars directory with and without -Dscala-2.11 flag. Difference between scala-2.10 and scala-2.11 profiles is that scala-2.10 profile has the following: modules moduleexternal/kafka/module /modules FYI On Sat, Jan 17, 2015 at 4:07 PM, Ted Yu yuzhih...@gmail.com wrote: I did the following: 1655 dev/change-version-to-2.11.sh 1657 mvn -DHADOOP_PROFILE=hadoop-2.4 -Pyarn,hive -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package And mvn command passed. Did you see any cross-compilation errors ? Cheers BTW the two links you mentioned are consistent in terms of building for Scala 2.11 On Sat, Jan 17, 2015 at 3:43 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, When I run this: dev/change-version-to-2.11.sh mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package as per here, maven doesn't build Spark's dependencies. Only when I run: dev/change-version-to-2.11.sh sbt/sbt -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package as gathered from here, do I get Spark's dependencies built without any cross-compilation errors. Question: - How can I make maven do this? - How can I specify the use of Scala 2.11 in my own .pom files? Thanks
NaiveBayes classifier causes ShuffleDependency class cast exception
I have the following code: SparkConf conf = new SparkConf().setAppName(streamer).setMaster(local[2]); conf.set(spark.driver.allowMultipleContexts, true); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(batch_interval)); ssc.checkpoint(/tmp/spark/checkpoint); SparkConf conf2 = new SparkConf().setAppName(classifier).setMaster(local[1]); conf2.set(spark.driver.allowMultipleContexts, true); JavaSparkContext sc = new JavaSparkContext(conf); JavaReceiverInputDStreamString stream = ssc.socketTextStream(localhost, ); // String to Tuple3 Conversion JavaDStreamTuple3lt;Long, String, String tuple_stream = stream.map(new FunctionString, Tuple3lt;Long, String, String() { ... }); JavaPairDStreamInteger, DictionaryEntry raw_dictionary_stream = tuple_stream.filter(new FunctionTuple3lt;Long, String,String, Boolean() { @Override public Boolean call(Tuple3Long, String,String tuple) throws Exception { if((tuple._1()/Time.scaling_factor % training_interval) training_dur) NaiveBayes.train(sc.parallelize(training_set).rdd()); return true; } }). I am working on a text mining project and I want to use NaiveBayesClassifier of MLlib to classify some stream items. So, I have two Spark contexts one of which is a streaming context. The call to NaiveBayes.train causes the following exception. Any ideas? Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassCastException: org.apache.spark.SparkContext$$anonfun$runJob$4 cannot be cast to org.apache.spark.ShuffleDependency at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NaiveBayes-classifier-causes-ShuffleDependency-class-cast-exception-tp21529.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail:
no option to add intercepts for StreamingLinearAlgorithm
hi all, just wondering if there is a reason why it is not possible to add intercepts for streaming regression models? I understand that run method in the underlying GeneralizedLinearModel does not take intercept as a parameter either. Any reason for that? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/no-option-to-add-intercepts-for-StreamingLinearAlgorithm-tp21526.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is there a way to access Hive UDFs in a HiveContext?
Hi, My guess is that Spark is not picking up the jar where the function is stored. You might have to add it to sparkcontext or the classpath manually. You can also register the function hc.registerFunction(myfunct, myfunct) then use it in the query. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-access-Hive-UDFs-in-a-HiveContext-tp21510p21527.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Tableau beta connector
Could you clarify what you mean by build another Spark and work through Spark Submit? If you are referring to utilizing Spark spark and thrift, you could start the Spark service and then have your spark-shell, spark-submit, and/or thrift service aim at the master you have started. On Thu Feb 05 2015 at 2:02:04 AM Ashutosh Trivedi (MT2013030) ashutosh.triv...@iiitb.org wrote: Hi Denny , Ismail one last question.. Is it necessary to build another Spark and work through Spark-submit ? I work on IntelliJ using SBT as build script, I have Hive set up with postgres as metastore, I can run the hive server using command *hive --service metastore* *hive --service hiveserver2* After that if I can use hive-context in my code val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) Do some processing on RDD and persist it on hive using registerTempTable and tableau can extract that RDD persisted on hive. Regards, Ashutosh -- *From:* Denny Lee denny.g@gmail.com *Sent:* Thursday, February 5, 2015 1:27 PM *To:* Ashutosh Trivedi (MT2013030); İsmail Keskin *Cc:* user@spark.apache.org *Subject:* Re: Tableau beta connector The context is that you would create your RDDs and then persist them in Hive. Once in Hive, the data is accessible from the Tableau extract through Spark thrift server. On Wed, Feb 4, 2015 at 23:36 Ashutosh Trivedi (MT2013030) ashutosh.triv...@iiitb.org wrote: Thanks Denny and Ismail. Denny ,I went through your blog, It was great help. I guess tableau beta connector also following the same procedure,you described in blog. I am building the Spark now. Basically what I don't get is, where to put my data so that tableau can extract. So Ismail,its just Spark SQL. No RDDs I think I am getting it now . We use spark for our big data processing and we want *processed data (Rdd)* into tableau. So we should put our data in hive metastore and tableau will extract it from there using this connector? Correct me if I am wrong. I guess I have to look at how thrift server works. -- *From:* Denny Lee denny.g@gmail.com *Sent:* Thursday, February 5, 2015 12:20 PM *To:* İsmail Keskin; Ashutosh Trivedi (MT2013030) *Cc:* user@spark.apache.org *Subject:* Re: Tableau beta connector Some quick context behind how Tableau interacts with Spark / Hive can also be found at https://www.concur.com/blog/en-us/connect-tableau-to-sparksql - its for how to connect from Tableau to the thrift server before the official Tableau beta connector but should provide some of the additional context called out. HTH! On Wed Feb 04 2015 at 10:47:23 PM İsmail Keskin ismail.kes...@dilisim.com wrote: Tableau connects to Spark Thrift Server via an ODBC driver. So, none of the RDD stuff applies, you just issue SQL queries from Tableau. The table metadata can come from Hive Metastore if you place your hive-site.xml to configuration directory of Spark. On Thu, Feb 5, 2015 at 8:11 AM, ashu ashutosh.triv...@iiitb.org wrote: Hi, I am trying out the tableau beta connector to Spark SQL. I have few basics question: Will this connector be able to fetch the schemaRDDs into tableau. Will all the schemaRDDs be exposed to tableau? Basically I am not getting what tableau will fetch at data-source? Is it existing files in HDFS? RDDs or something else. Question may be naive but I did not get answer anywhere else. Would really appreciate if someone has already tried it, can help me with this. Thanks, Ashutosh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: word2vec: how to save an mllib model and reload it?
As a Spark newbie, I've come across this thread. I'm playing with Word2Vec in our Hadoop cluster and here's my issue with classic Java serialization of the model: I don't have SSH access to the cluster master node. Here's my code for computing the model: val input = sc.textFile(README.md).map(line = line.split( ).toSeq) val word2vec = new Word2Vec(); val model = word2vec.fit(input); val oos = new ObjectOutputStream(new FileOutputStream(modelFile)); oos.writeObject(model); oos.close(); I can do that locally and get the file as desired. But that is of little use for me if the file is stored on the master. I've alternatively serialized the vectors to HDFS using this code: val vectors = model.getVectors; val output = sc.parallelize(vectors.toSeq); output.saveAsObjectFile(modelFile); Indeed, this results in a serialization on HDFS so I can access it as a user. However, I have not figured out how to create a new Word2VecModel object from those files. Any clues? Thanks! Carsten MLnick wrote Currently I see the word2vec model is collected onto the master, so the model itself is not distributed. I guess the question is why do you need a distributed model? Is the vocab size so large that it's necessary? For model serving in general, unless the model is truly massive (ie cannot fit into memory on a modern high end box with 64, or 128GB ram) then single instance is way faster and simpler (using a cluster of machines is more for load balancing / fault tolerance). What is your use case for model serving? — Sent from Mailbox On Fri, Nov 7, 2014 at 5:47 PM, Duy Huynh lt; duy.huynh.uiv@ gt; wrote: you're right, serialization works. what is your suggestion on saving a distributed model? so part of the model is in one cluster, and some other parts of the model are in other clusters. during runtime, these sub-models run independently in their own clusters (load, train, save). and at some point during run time these sub-models merge into the master model, which also loads, trains, and saves at the master level. much appreciated. On Fri, Nov 7, 2014 at 2:53 AM, Evan R. Sparks lt; evan.sparks@ gt; wrote: There's some work going on to support PMML - https://issues.apache.org/jira/browse/SPARK-1406 - but it's not yet been merged into master. What are you used to doing in other environments? In R I'm used to running save(), same with matlab. In python either pickling things or dumping to json seems pretty common. (even the scikit-learn docs recommend pickling - http://scikit-learn.org/stable/modules/model_persistence.html). These all seem basically equivalent java serialization to me.. Would some helper functions (in, say, mllib.util.modelpersistence or something) make sense to add? On Thu, Nov 6, 2014 at 11:36 PM, Duy Huynh lt; duy.huynh.uiv@ gt; wrote: that works. is there a better way in spark? this seems like the most common feature for any machine learning work - to be able to save your model after training it and load it later. On Fri, Nov 7, 2014 at 2:30 AM, Evan R. Sparks lt; evan.sparks@ gt; wrote: Plain old java serialization is one straightforward approach if you're in java/scala. On Thu, Nov 6, 2014 at 11:26 PM, ll lt; duy.huynh.uiv@ gt; wrote: what is the best way to save an mllib model that you just trained and reload it in the future? specifically, i'm using the mllib word2vec model... thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/word2vec-how-to-save-an-mllib-model-and-reload-it-tp18329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscribe@.apache For additional commands, e-mail: user-help@.apache -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/word2vec-how-to-save-an-mllib-model-and-reload-it-tp18329p21517.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Use Spark as multi-threading library and deprecate web UI
You can use akka, that is the underlying Multithreading library Spark uses. On Thu, Feb 5, 2015 at 9:56 PM, Shuai Zheng szheng.c...@gmail.com wrote: Nice. I just try and it works. Thanks very much! And I notice there is below in the log: 15/02/05 11:19:09 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@NY02913D.global.local:8162] 15/02/05 11:19:10 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@NY02913D.global.local:8162/user/HeartbeatReceiver As I understand. The local mode will have driver and executors in the same java process. So is there any way for me to also disable above two listeners? Or they are not optional even in local mode? Regards, Shuai -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, February 05, 2015 10:53 AM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: Use Spark as multi-threading library and deprecate web UI Do you mean disable the web UI? spark.ui.enabled=false Sure, it's useful with master = local[*] too. On Thu, Feb 5, 2015 at 9:30 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, It might sounds weird, but I think spark is perfect to be used as a multi-threading library in some cases. The local mode will naturally boost multiple thread when required. Because it is more restrict and less chance to have potential bug in the code (because it is more data oriental, not thread oriental). Of course, it cannot be used for all cases, but in most of my applications, it is enough (90%). I want to hear other people’s idea about this. BTW: if I run spark in local mode, how to deprecate the web UI (default listen on 4040), because I don’t want to start the UI every time if I use spark as a local library. Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Spark job ends abruptly during setup without error message
While a spark-submit job is setting up, the yarnAppState goes into Running mode, then I get a flurry of typical looking INFO-level messages such as INFO BlockManagerMasterActor: ... INFO YarnClientSchedulerBackend: Registered executor: ... Then, spark-submit quits without any error message and I'm back at the command line. What could be causing this? Arun
RE: Use Spark as multi-threading library and deprecate web UI
Nice. I just try and it works. Thanks very much! And I notice there is below in the log: 15/02/05 11:19:09 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@NY02913D.global.local:8162] 15/02/05 11:19:10 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@NY02913D.global.local:8162/user/HeartbeatReceiver As I understand. The local mode will have driver and executors in the same java process. So is there any way for me to also disable above two listeners? Or they are not optional even in local mode? Regards, Shuai -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, February 05, 2015 10:53 AM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: Use Spark as multi-threading library and deprecate web UI Do you mean disable the web UI? spark.ui.enabled=false Sure, it's useful with master = local[*] too. On Thu, Feb 5, 2015 at 9:30 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, It might sounds weird, but I think spark is perfect to be used as a multi-threading library in some cases. The local mode will naturally boost multiple thread when required. Because it is more restrict and less chance to have potential bug in the code (because it is more data oriental, not thread oriental). Of course, it cannot be used for all cases, but in most of my applications, it is enough (90%). I want to hear other people’s idea about this. BTW: if I run spark in local mode, how to deprecate the web UI (default listen on 4040), because I don’t want to start the UI every time if I use spark as a local library. Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to debug this kind of error, e.g. lost executor?
Anyone has idea on where I can find the detailed log of that lost executor(why it was lost)? Thanks in advance! On 05 Feb 2015, at 16:14, Yifan LI iamyifa...@gmail.com wrote: Hi, I am running a heavy memory/cpu overhead graphx application, I think the memory is sufficient and set RDDs’ StorageLevel using MEMORY_AND_DISK. But I found there were some tasks failed due to following errors: java.io.FileNotFoundException: /data/spark/local/spark-local-20150205151711-9700/09/rdd_3_275 (No files or folders of this type) ExecutorLostFailure (executor 11 lost) So, finally that stage failed: org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /data/spark/local/spark-local-20150205151711-587a/16/shuffle_11_219_0.index Anyone has points? Where I can get more details for this issue? Best, Yifan LI
Re: How to design a long live spark application
If you want to design something like Spark shell have a look at: http://zeppelin-project.org/ Its open source and may already do what you need. If not, its source code will be helpful in answering the questions about how to integrate with long running jobs that you have. On Thu Feb 05 2015 at 11:42:56 AM Boromir Widas vcsub...@gmail.com wrote: You can check out https://github.com/spark-jobserver/spark-jobserver - this allows several users to upload their jars and run jobs with a REST interface. However, if all users are using the same functionality, you can write a simple spray server which will act as the driver and hosts the spark context+RDDs, launched in client mode. On Thu, Feb 5, 2015 at 10:25 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I want to develop a server side application: User submit request à Server run spark application and return (this might take a few seconds). So I want to host the server to keep the long-live context, I don’t know whether this is reasonable or not. Basically I try to have a global JavaSparkContext instance and keep it there, and initialize some RDD. Then my java application will use it to submit the job. So now I have some questions: 1, if I don’t close it, will there any timeout I need to configure on the spark server? 2, In theory I want to design something similar to Spark shell (which also host a default sc there), just it is not shell based. Any suggestion? I think my request is very common for application development, here must someone has done it before? Regards, Shawn
Re: How to design a long live spark application
Here's another lightweight example of running a SparkContext in a common java servlet container: https://github.com/calrissian/spark-jetty-server On Thu, Feb 5, 2015 at 11:46 AM, Charles Feduke charles.fed...@gmail.com wrote: If you want to design something like Spark shell have a look at: http://zeppelin-project.org/ Its open source and may already do what you need. If not, its source code will be helpful in answering the questions about how to integrate with long running jobs that you have. On Thu Feb 05 2015 at 11:42:56 AM Boromir Widas vcsub...@gmail.com wrote: You can check out https://github.com/spark-jobserver/spark-jobserver - this allows several users to upload their jars and run jobs with a REST interface. However, if all users are using the same functionality, you can write a simple spray server which will act as the driver and hosts the spark context+RDDs, launched in client mode. On Thu, Feb 5, 2015 at 10:25 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I want to develop a server side application: User submit request à Server run spark application and return (this might take a few seconds). So I want to host the server to keep the long-live context, I don’t know whether this is reasonable or not. Basically I try to have a global JavaSparkContext instance and keep it there, and initialize some RDD. Then my java application will use it to submit the job. So now I have some questions: 1, if I don’t close it, will there any timeout I need to configure on the spark server? 2, In theory I want to design something similar to Spark shell (which also host a default sc there), just it is not shell based. Any suggestion? I think my request is very common for application development, here must someone has done it before? Regards, Shawn
Re: how to debug this kind of error, e.g. lost executor?
Li, I cannot tell you the reason for this exception but have seen these kind of errors when using HASH based shuffle manager (which is default) until v 1.2. Try the SORT shuffle manager. Hopefully that will help Thanks Ankur Anyone has idea on where I can find the detailed log of that lost executor(why it was lost)? Thanks in advance! On 05 Feb 2015, at 16:14, Yifan LI iamyifa...@gmail.com wrote: Hi, I am running a heavy memory/cpu overhead graphx application, I think the memory is sufficient and set RDDs’ StorageLevel using MEMORY_AND_DISK. But I found there were some tasks failed due to following errors: java.io.FileNotFoundException: /data/spark/local/spark-local-20150205151711-9700/09/rdd_3_275 (No files or folders of this type) ExecutorLostFailure (executor 11 lost) So, finally that stage failed: org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /data/spark/local/spark-local-20150205151711-587a/16/shuffle_11_219_0.index Anyone has points? Where I can get more details for this issue? Best, Yifan LI
Re: Spark sql failed in yarn-cluster mode when connecting to non-default hive database
Hi Jenny, You may try to use |--files $SPARK_HOME/conf/hive-site.xml --driver-class-path hive-site.xml| when submitting your application. The problem is that when running in cluster mode, the driver is actually running in a random container directory on a random executor node. By using |--files|, you upload hive-site.xml to the container directory, by using |--driver-class-path hive-site.xml|, you add the file to classpath (the path is relative to the container directory). When running in cluster mode, have you tried to check the tables inside the default database? If my guess is right, this should be an empty default database inside the default Derby metastore created by HiveContext when the hive-site.xml is missing. Best, Cheng On 8/12/14 5:38 PM, Jenny Zhao wrote: Hi Yin, hive-site.xml was copied to spark/conf and the same as the one under $HIVE_HOME/conf. through hive cli, I don't see any problem. but for spark on yarn-cluster mode, I am not able to switch to a database other than the default one, for Yarn-client mode, it works fine. Thanks! Jenny On Tue, Aug 12, 2014 at 12:53 PM, Yin Huai huaiyin@gmail.com mailto:huaiyin@gmail.com wrote: Hi Jenny, Have you copied hive-site.xml to spark/conf directory? If not, can you put it in conf/ and try again? Thanks, Yin On Mon, Aug 11, 2014 at 8:57 PM, Jenny Zhao linlin200...@gmail.com mailto:linlin200...@gmail.com wrote: Thanks Yin! here is my hive-site.xml, which I copied from $HIVE_HOME/conf, didn't experience problem connecting to the metastore through hive. which uses DB2 as metastore database. ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? !-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the License); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -- configuration property namehive.hwi.listen.port/name value/value /property property namehive.querylog.location/name value/var/ibm/biginsights/hive/query/${user.name http://user.name}/value /property property namehive.metastore.warehouse.dir/name value/biginsights/hive/warehouse/value /property property namehive.hwi.war.file/name valuelib/hive-hwi-0.12.0.war/value /property property namehive.metastore.metrics.enabled/name valuetrue/value /property property namejavax.jdo.option.ConnectionURL/name valuejdbc:db2://hdtest022.svl.ibm.com:50001/BIDB http://hdtest022.svl.ibm.com:50001/BIDB/value /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.ibm.db2.jcc.DB2Driver/value /property property namehive.stats.autogather/name valuefalse/value /property property namejavax.jdo.mapping.Schema/name valueHIVE/value /property property namejavax.jdo.option.ConnectionUserName/name valuecatalog/value /property property namejavax.jdo.option.ConnectionPassword/name valueV2pJNWMxbFlVbWhaZHowOQ==/value /property property namehive.metastore.password.encrypt/name valuetrue/value /property property nameorg.jpox.autoCreateSchema/name valuetrue/value /property property namehive.server2.thrift.min.worker.threads/name value5/value /property property namehive.server2.thrift.max.worker.threads/name value100/value /property property namehive.server2.thrift.port/name value1/value /property property namehive.server2.thrift.bind.host/name valuehdtest022.svl.ibm.com http://hdtest022.svl.ibm.com/value /property property namehive.server2.authentication/name valueCUSTOM/value
Re: Can't access remote Hive table from spark
Please note that Spark 1.2.0 /only/ support Hive 0.13.1 /or/ 0.12.0, none of other versions are supported. Best, Cheng On 1/25/15 12:18 AM, guxiaobo1982 wrote: Hi, I built and started a single node standalone Spark 1.2.0 cluster along with a single node Hive 0.14.0 instance installed by Ambari 1.17.0. On the Spark and Hive node I can create and query tables inside Hive, and on remote machines I can submit the SparkPi example to the Spark master. But I failed to run the following example code : public class SparkTest { public static void main(String[] args) { String appName= This is a test application; String master=spark://lix1.bh.com:7077; SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf); JavaHiveContext sqlCtx = new org.apache.spark.sql.hive.api.java.JavaHiveContext(sc); //sqlCtx.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)); //sqlCtx.sql(LOAD DATA LOCAL INPATH '/opt/spark/examples/src/main/resources/kv1.txt' INTO TABLE src); // Queries are expressed in HiveQL. ListRow rows = sqlCtx.sql(FROM src SELECT key, value).collect(); System.out.print(I got + rows.size() + rows \r\n); sc.close();} } Exception in thread main org.apache.hadoop.hive.ql.metadata.InvalidTableException: Table not found src at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:980) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$anon$2.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137) at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
spark driver behind firewall
I submit spark job from machine behind firewall, I can't open any incoming connections to that box, does driver absolutely need to accept incoming connections? Is there any workaround for that case? Thanks.
RE: My first experience with Spark
Hi, Deb:From what I search online, changing parallelism is one option. But the failed stage already had 200 tasks, which is quite large on a one 24 core box.I know query that amount of data in one box is kind of over, but I do want to know how to config it using less memory, even it could mean using more time.We plan to make spark coexist with Hadoop cluster, so be able to control its memory usage is important for us.Does spark need that much of memory?ThanksYong Date: Thu, 5 Feb 2015 15:36:48 -0800 Subject: Re: My first experience with Spark From: deborah.sie...@gmail.com To: java8...@hotmail.com CC: user@spark.apache.org Hi Yong, Have you tried increasing your level of parallelism? How many tasks are you getting in failing stage? 2-3 tasks per CPU core is recommended, though maybe you need more for your shuffle operation? You can configure spark.default.parallelism, or pass in a level of parallelism as second parameter to a suitable operation in your code. Deb On Thu, Feb 5, 2015 at 1:03 PM, java8964 java8...@hotmail.com wrote: I am evaluating Spark for our production usage. Our production cluster is Hadoop 2.2.0 without Yarn. So I want to test Spark with Standalone deployment running with Hadoop. What I have in mind is to test a very complex Hive query, which joins between 6 tables, lots of nested structure with exploding, and currently takes 8 hours daily running in our production. All the data of this query are in AVRO + Snappy. I setup one Box (24 core + 64G memory), installed the same version of Hadoop as our production, and put 5% of data on it (which is about 60G, snappy compressed AVRO files) I am running the same query in Hive. It took 6 rounds of MR jobs, finished around 30 hours on this one box. Now, I start to have fun with Spark. I checked out Spark 1.2.0, built it following Spark build instructions, and installed on this one box. Since the test data is all in AVRO format, so I also built the latest development version of SparkAvro, from https://github.com/databricks/spark-avro 1) First, I got some problems to use the AVRO data in spark-avro. It turns our that Spark 1.2.0 build processing will merge the mismatched version of AVRO core and AVRO mapred jars. I manually fixed it. See issue here: https://github.com/databricks/spark-avro/issues/242) After that, I am impressed becauseThe AVRO file just works from HDFS to Spark 1.2The complex query (about 200 lines) just starts to run in Spark 1.2 using org.apache.spark.sql.hive.HiveContext without any problem. This HiveContext just works in Spark SQL 1.2. Very nice.3) I got several OOM, which is reasonable. I finally changes the memory setting to: export SPARK_WORKER_MEMORY=8gexport SPARK_DRIVER_MEMORY=2gexport SPARK_EXECUTOR_MEMORY=8g As 4g just doesn't work for the test data volume. After I set to 8G, the job won't fail due to OOM. 4) It looks like Spark generates 8 stages for the big query. It finishes the stage 1 and stage 2, then failed on stage 3 twice with the following error: FetchFailed(null, shuffleId=7, mapId=-1, reduceId=7, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 7 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at
Get filename in Spark Streaming
Hi All, We have filename with timestamp say ABC_1421893256000.txt and the timestamp needs to be extracted from file name for further processing.Is there a way to get input file name picked up by spark streaming job? Thanks in advance Subacini
Re: Error KafkaStream
Hi Shao, When I changed to StringDecoder I’ve get this compiling error: [error] /sata_disk/workspace/spark-1.2.0/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaW ordCount.scala:78: not found: type StringDecoder [error] KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap,stora geLevel = StorageLevel.MEMORY_ONLY_SER).map(_._2) [error] ^ [error] /sata_disk/workspace/spark-1.2.0/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaW ordCount.scala:85: value split is not a member of Nothing [error] val words = unifiedStream.flatMap(_.split( )) [error] ^ [error] /sata_disk/workspace/spark-1.2.0/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaW ordCount.scala:86: value reduceByKeyAndWindow is not a member of org.apache.spark.streaming.dstream.DStream[(Nothing, Long)] [error] val wordCounts = words.map(x = (x, 1L)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(20), Seconds(10), 2) [error] ^ [error] three errors found [error] (examples/compile:compile) Compilation failed On Feb 6, 2015, at 02:11, Shao, Saisai saisai.s...@intel.com wrote: Hi, I think you should change the `DefaultDecoder` of your type parameter into `StringDecoder`, seems you want to decode the message into String. `DefaultDecoder` is to return Array[Byte], not String, so here class casting will meet error. Thanks Jerry -Original Message- From: Eduardo Costa Alfaia [mailto:e.costaalf...@unibs.it] Sent: Friday, February 6, 2015 12:04 AM To: Sean Owen Cc: user@spark.apache.org Subject: Re: Error KafkaStream I don’t think so Sean. On Feb 5, 2015, at 16:57, Sean Owen so...@cloudera.com wrote: Is SPARK-4905 / https://github.com/apache/spark/pull/4371/files the same issue? On Thu, Feb 5, 2015 at 7:03 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi Guys, I’m getting this error in KafkaWordCount; TaskSetManager: Lost task 0.0 in stage 4095.0 (TID 1281, 10.20.10.234): java.lang.ClassCastException: [B cannot be cast to java.lang.String at org.apache.spark.examples.streaming.KafkaWordCount$$anonfun$4$$anonfu n$apply$1.apply(KafkaWordCount.scala:7 Some idea that could be? Bellow the piece of code val kafkaStream = { val kafkaParams = Map[String, String]( zookeeper.connect - achab3:2181, group.id - mygroup, zookeeper.connect.timeout.ms - 1, kafka.fetch.message.max.bytes - 400, auto.offset.reset - largest) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap //val lines = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMa p, storageLevel = StorageLevel.MEMORY_ONLY_SER).map(_._2) val KafkaDStreams = (1 to numStreams).map {_ = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMap, storageLe vel = StorageLevel.MEMORY_ONLY_SER).map(_._2) } val unifiedStream = ssc.union(KafkaDStreams) unifiedStream.repartition(sparkProcessingParallelism) } Thanks Guys Informativa sulla Privacy: http://www.unibs.it/node/8155 -- Informativa sulla Privacy: http://www.unibs.it/node/8155 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Informativa sulla Privacy: http://www.unibs.it/node/8155
RE: Error KafkaStream
Did you include Kafka jars? This StringDecoder is under kafka/serializer, You can refer to the unit test KafkaStreamSuite in Spark to see how to use this API. Thanks Jerry From: Eduardo Costa Alfaia [mailto:e.costaalf...@unibs.it] Sent: Friday, February 6, 2015 9:44 AM To: Shao, Saisai Cc: Sean Owen; user@spark.apache.org Subject: Re: Error KafkaStream Hi Shao, When I changed to StringDecoder I’ve get this compiling error: [error] /sata_disk/workspace/spark-1.2.0/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaW ordCount.scala:78: not found: type StringDecoder [error] KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap,stora geLevel = StorageLevel.MEMORY_ONLY_SER).map(_._2) [error] ^ [error] /sata_disk/workspace/spark-1.2.0/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaW ordCount.scala:85: value split is not a member of Nothing [error] val words = unifiedStream.flatMap(_.split( )) [error] ^ [error] /sata_disk/workspace/spark-1.2.0/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaW ordCount.scala:86: value reduceByKeyAndWindow is not a member of org.apache.spark.streaming.dstream.DStream[(Nothing, Long)] [error] val wordCounts = words.map(x = (x, 1L)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(20), Seconds(10), 2) [error] ^ [error] three errors found [error] (examples/compile:compile) Compilation failed On Feb 6, 2015, at 02:11, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi, I think you should change the `DefaultDecoder` of your type parameter into `StringDecoder`, seems you want to decode the message into String. `DefaultDecoder` is to return Array[Byte], not String, so here class casting will meet error. Thanks Jerry -Original Message- From: Eduardo Costa Alfaia [mailto:e.costaalf...@unibs.it] Sent: Friday, February 6, 2015 12:04 AM To: Sean Owen Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Error KafkaStream I don’t think so Sean. On Feb 5, 2015, at 16:57, Sean Owen so...@cloudera.commailto:so...@cloudera.com wrote: Is SPARK-4905 / https://github.com/apache/spark/pull/4371/files the same issue? On Thu, Feb 5, 2015 at 7:03 AM, Eduardo Costa Alfaia e.costaalf...@unibs.itmailto:e.costaalf...@unibs.it wrote: Hi Guys, I’m getting this error in KafkaWordCount; TaskSetManager: Lost task 0.0 in stage 4095.0 (TID 1281, 10.20.10.234): java.lang.ClassCastException: [B cannot be cast to java.lang.String at org.apache.spark.examples.streaming.KafkaWordCount$$anonfun$4$$anonfu n$apply$1.apply(KafkaWordCount.scala:7 Some idea that could be? Bellow the piece of code val kafkaStream = { val kafkaParams = Map[String, String]( zookeeper.connect - achab3:2181, group.id - mygroup, zookeeper.connect.timeout.ms - 1, kafka.fetch.message.max.bytes - 400, auto.offset.reset - largest) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap //val lines = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMa p, storageLevel = StorageLevel.MEMORY_ONLY_SER).map(_._2) val KafkaDStreams = (1 to numStreams).map {_ = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMap, storageLe vel = StorageLevel.MEMORY_ONLY_SER).map(_._2) } val unifiedStream = ssc.union(KafkaDStreams) unifiedStream.repartition(sparkProcessingParallelism) } Thanks Guys Informativa sulla Privacy: http://www.unibs.it/node/8155 -- Informativa sulla Privacy: http://www.unibs.it/node/8155 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org Informativa sulla Privacy: http://www.unibs.it/node/8155
Spark stalls or hangs: is this a clue? remote fetches seem to never return?
Greetings! Again, thanks to all who have given suggestions.I am still trying to diagnose a problem where I have processes than run for one or several hours but intermittently stall or hang.By stall I mean that there is no CPU usage on the workers or the driver, nor network activity, nor do I see disk activity.It just hangs. Using the Application Master to find which workers still had active tasks, I then went to that machine and looked in the user logs.In one of the users log's stderr files, it ends with Started 50 remote fetchesShould there be a message saying that the fetch was completed?Any suggestions as to how I might diagnose why the fetch was not completed? Thanks!-Mike Here is the last part of the log:15/02/06 01:33:46 INFO storage.MemoryStore: ensureFreeSpace(5368) called with curMem=875861, maxMem=231564902415/02/06 01:33:46 INFO storage.MemoryStore: Block broadcast_10 stored as values in memory (estimated size 5.2 KB, free 2.2 GB)15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 5, fetching them15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-171-0-208.ec2.internal:44124/user/MapOutputTracker#-878402310]15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 5, fetching them15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Got the output locations15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Getting 300 non-empty blocks out of 300 blocks15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Getting 300 non-empty blocks out of 300 blocks15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Started 50 remote fetches in 47 ms15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Started 50 remote fetches in 48 msIt's been like that for half and hour. Thanks!-Mike
Re: StreamingContext getOrCreate with queueStream
I dont think your screenshots came through in the email. None the less, queueStream will not work with getOrCreate. Its mainly for testing (by generating your own RDDs) and not really useful for production usage (where you really need to checkpoint-based recovery). TD On Thu, Feb 5, 2015 at 4:12 PM, pnpritchard nicholas.pritch...@falkonry.com wrote: I am trying to use the StreamingContext getOrCreate method in my app. I started by running the example ( RecoverableNetworkWordCount https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala ), which worked as expected. However, when I modified that example to use /queueStream/ rather than /socketTextStream/ for it's input, then things broke down. I first ran it with an empty checkpoint directory, then restarted the app and got a NPE (copied below). Is this a known limitation of using queueStream? Am I assuming something by using it? Thanks in advance, for any advice! FYI, I changed line 73 in the example to be: -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StreamingContext-getOrCreate-with-queueStream-tp21528.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to broadcast a variable read from a file in yarn-cluster mode?
Hi Jon, You'll need to put the file on HDFS (or whatever distributed filesystem you're running on) and load it from there. -Sandy On Thu, Feb 5, 2015 at 3:18 PM, YaoPau jonrgr...@gmail.com wrote: I have a file badFullIPs.csv of bad IP addresses used for filtering. In yarn-client mode, I simply read it off the edge node, transform it, and then broadcast it: val badIPs = fromFile(edgeDir + badfullIPs.csv) val badIPsLines = badIPs.getLines val badIpSet = badIPsLines.toSet val badIPsBC = sc.broadcast(badIpSet) badIPs.close How can I accomplish this in yarn-cluster mode? Jon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parquet compression codecs not applied
Hi Ayoub, The doc page isn’t wrong, but it’s indeed confusing. |spark.sql.parquet.compression.codec| is used when you’re wring Parquet file with something like |data.saveAsParquetFile(...)|. However, you are using Hive DDL in the example code. All Hive DDLs and commands like |SET| are directly delegated to Hive, which unfortunately ignores Spark configurations. And yet, it should be updated. Best, Cheng On 1/10/15 5:49 AM, Ayoub Benali wrote: it worked thanks. this doc page https://spark.apache.org/docs/1.2.0/sql-programming-guide.htmlrecommends to use spark.sql.parquet.compression.codec to set the compression coded and I thought this setting would be forwarded to the hive context given that HiveContext extends SQLContext, but it was not. I am wondering if this behavior is normal, if not I could open an issue with a potential fix so that spark.sql.parquet.compression.codec would be translated to parquet.compression in the hive context ? Or the documentation should be updated to mention that the compression coded is set differently with HiveContext. Ayoub. 2015-01-09 17:51 GMT+01:00 Michael Armbrust mich...@databricks.com mailto:mich...@databricks.com: This is a little confusing, but that code path is actually going through hive. So the spark sql configuration does not help. Perhaps, try: set parquet.compression=GZIP; On Fri, Jan 9, 2015 at 2:41 AM, Ayoub benali.ayoub.i...@gmail.com mailto:benali.ayoub.i...@gmail.com wrote: Hello, I tried to save a table created via the hive context as a parquet file but whatever compression codec (uncompressed, snappy, gzip or lzo) I set via setConf like: setConf(spark.sql.parquet.compression.codec, gzip) the size of the generated files is the always the same, so it seems like spark context ignores the compression codec that I set. Here is a code sample applied via the spark shell: import org.apache.spark.sql.hive.HiveContext val hiveContext = new HiveContext(sc) hiveContext.sql(SET hive.exec.dynamic.partition = true) hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict) hiveContext.setConf(spark.sql.parquet.binaryAsString, true) // required to make data compatible with impala hiveContext.setConf(spark.sql.parquet.compression.codec, gzip) hiveContext.sql(create external table if not exists foo (bar STRING, ts INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET Location 'hdfs://path/data/foo') hiveContext.sql(insert into table foo partition(year, month,day) select *, year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month, day(from_unixtime(ts)) as day from raw_foo) I tried that with spark 1.2 and 1.3 snapshot against hive 0.13 and I also tried that with Impala on the same cluster which applied correctly the compression codecs. Does anyone know what could be the problem ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
RE: Error KafkaStream
Hi, I think you should change the `DefaultDecoder` of your type parameter into `StringDecoder`, seems you want to decode the message into String. `DefaultDecoder` is to return Array[Byte], not String, so here class casting will meet error. Thanks Jerry -Original Message- From: Eduardo Costa Alfaia [mailto:e.costaalf...@unibs.it] Sent: Friday, February 6, 2015 12:04 AM To: Sean Owen Cc: user@spark.apache.org Subject: Re: Error KafkaStream I don’t think so Sean. On Feb 5, 2015, at 16:57, Sean Owen so...@cloudera.com wrote: Is SPARK-4905 / https://github.com/apache/spark/pull/4371/files the same issue? On Thu, Feb 5, 2015 at 7:03 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi Guys, I’m getting this error in KafkaWordCount; TaskSetManager: Lost task 0.0 in stage 4095.0 (TID 1281, 10.20.10.234): java.lang.ClassCastException: [B cannot be cast to java.lang.String at org.apache.spark.examples.streaming.KafkaWordCount$$anonfun$4$$anonfu n$apply$1.apply(KafkaWordCount.scala:7 Some idea that could be? Bellow the piece of code val kafkaStream = { val kafkaParams = Map[String, String]( zookeeper.connect - achab3:2181, group.id - mygroup, zookeeper.connect.timeout.ms - 1, kafka.fetch.message.max.bytes - 400, auto.offset.reset - largest) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap //val lines = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMa p, storageLevel = StorageLevel.MEMORY_ONLY_SER).map(_._2) val KafkaDStreams = (1 to numStreams).map {_ = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMap, storageLe vel = StorageLevel.MEMORY_ONLY_SER).map(_._2) } val unifiedStream = ssc.union(KafkaDStreams) unifiedStream.repartition(sparkProcessingParallelism) } Thanks Guys Informativa sulla Privacy: http://www.unibs.it/node/8155 -- Informativa sulla Privacy: http://www.unibs.it/node/8155 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Requested array size exceeds VM limit Error
Hi, I have a 170GB data tab limited data set which I am converting into the RDD[LabeledPoint] format. I am then taking a 60% sample of this data set to be used for training a GBT model. I got the Size exceeds Integer.MAX_VALUE error which I fixed by repartitioning the data set to 1000 partitions. Now, the GBT code caches the data set, if it's not already cached, with this operation input.persist(StorageLevel.MEMORY_AND_DISK) (https://github.com/apache/spark/blob/branch-1.2/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala ). To pre-empt this caching so I can better control it, I am caching the RDD (after repartition) with this command, trainingData.persist(StorageLevel.MEMORY_AND_DISK_SER_2) But now, I get the following error on one executor and the application fails after a retry. I am not sure how to fix this. Could someone help with this? One possible reason could be that I submit my job with --driver-memory 11G --executor-memory 11G but I am allotted only 5.7GB. I am not sure if this could actually cause an affect. My runtime environment: 120 executors with 5.7 GB each, Driver has 5.3 GB. My Spark Config: set(spark.default.parallelism, 300).set(spark.akka.frameSize, 256).set(spark.akka.timeout, 1000).set(spark.core.connection.ack.wait.timeout,200).set(spark.akka.threads, 10).set(spark.serializer, org.apache.spark.serializer.KryoSerializer).set(spark.kryoserializer.buffer.mb, 256) java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.kryo.io.Output.require(Output.java:135) at com.esotericsoftware.kryo.io.Output.writeLong(Output.java:477) at com.esotericsoftware.kryo.io.Output.writeDouble(Output.java:596) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:212) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:200) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1175) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1184) at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:103) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:789) at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:167) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) Thank You! Vinay
Re: How many stages in my application?
Thanks Akhil and Mark. I can of course count events (assuming I can deduce the shuffle boundaries), but like I said the program isn't simple and I'd have to do this manually every time I change the code. So I rather find a way of doing this automatically if possible. On 4 February 2015 at 19:41, Mark Hamstra m...@clearstorydata.com wrote: But there isn't a 1-1 mapping from operations to stages since multiple operations will be pipelined into a single stage if no shuffle is required. To determine the number of stages in a job you really need to be looking for shuffle boundaries. On Wed, Feb 4, 2015 at 11:27 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can easily understand the flow by looking at the number of operations in your program (like map, groupBy, join etc.), first of all you list out the number of operations happening in your application and then from the webui you will be able to see how many operations have happened so far. Thanks Best Regards On Wed, Feb 4, 2015 at 4:33 PM, Joe Wass jw...@crossref.org wrote: I'm sitting here looking at my application crunching gigabytes of data on a cluster and I have no idea if it's an hour away from completion or a minute. The web UI shows progress through each stage, but not how many stages remaining. How can I work out how many stages my program will take automatically? My application has a slightly interesting DAG (re-use of functions that contain Spark transformations, persistent RDDs). Not that complex, but not 'step 1, step 2, step 3'. I'm guessing that if the driver program runs sequentially sending messages to Spark, then Spark has no knowledge of the structure of the driver program. Therefore it's necessary to execute it on a small test dataset and see how many stages result? When I set spark.eventLog.enabled = true and run on (very small) test data I don't get any stage messages in my STDOUT or in the log file. This is on a `local` instance. Did I miss something obvious? Thanks! Joe
Re: How many stages in my application?
RDD#toDebugString will help. On Thu, Feb 5, 2015 at 1:15 AM, Joe Wass jw...@crossref.org wrote: Thanks Akhil and Mark. I can of course count events (assuming I can deduce the shuffle boundaries), but like I said the program isn't simple and I'd have to do this manually every time I change the code. So I rather find a way of doing this automatically if possible. On 4 February 2015 at 19:41, Mark Hamstra m...@clearstorydata.com wrote: But there isn't a 1-1 mapping from operations to stages since multiple operations will be pipelined into a single stage if no shuffle is required. To determine the number of stages in a job you really need to be looking for shuffle boundaries. On Wed, Feb 4, 2015 at 11:27 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can easily understand the flow by looking at the number of operations in your program (like map, groupBy, join etc.), first of all you list out the number of operations happening in your application and then from the webui you will be able to see how many operations have happened so far. Thanks Best Regards On Wed, Feb 4, 2015 at 4:33 PM, Joe Wass jw...@crossref.org wrote: I'm sitting here looking at my application crunching gigabytes of data on a cluster and I have no idea if it's an hour away from completion or a minute. The web UI shows progress through each stage, but not how many stages remaining. How can I work out how many stages my program will take automatically? My application has a slightly interesting DAG (re-use of functions that contain Spark transformations, persistent RDDs). Not that complex, but not 'step 1, step 2, step 3'. I'm guessing that if the driver program runs sequentially sending messages to Spark, then Spark has no knowledge of the structure of the driver program. Therefore it's necessary to execute it on a small test dataset and see how many stages result? When I set spark.eventLog.enabled = true and run on (very small) test data I don't get any stage messages in my STDOUT or in the log file. This is on a `local` instance. Did I miss something obvious? Thanks! Joe
Re: how to specify hive connection options for HiveContext
Hi Are you trying to run a spark job from inside eclipse? and want the job to access hive configuration options.? To access hive tables? Thanks Arush On Tue, Feb 3, 2015 at 7:24 AM, guxiaobo1982 guxiaobo1...@qq.com wrote: Hi, I know two options, one for spark_submit, the other one for spark-shell, but how to set for programs running inside eclipse? Regards, -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Spark SQL - Not able to create schema RDD for nested Directory for specific directory names
Hi, I got strange behavior. When I am creating schema RDD for nested directory sometimes it work and sometime it does not work. My question is whether nested directory supported or not? My code is as below. val fileLocation = hdfs://localhost:9000/apps/hive/warehouse/hl7 val parquetRDD = sqlContex.parquetFile(fileLocation) My HDFS direcotries are as below. /apps/hive/warehouse/hl7/_SUCCESS -rw-r--r-- 1 hdfs supergroup 5809 2015-02-05 10:44 /apps/hive/warehouse/hl7/_common_metadata -rw-r--r-- 1 hdfs supergroup 15127 2015-02-05 10:44 /apps/hive/warehouse/hl7/_metadata -rw-r--r-- 1 hdfs supergroup 174044 2015-02-03 10:51 /apps/hive/warehouse/hl7/part-r-1.parquet -rw-r--r-- 1 hdfs supergroup 190220 2015-02-03 10:51 /apps/hive/warehouse/hl7/part-r-2.parquet drwxr-xr-x - hdfs supergroup 0 2015-02-05 15:35 /apps/hive/warehouse/hl7/111 I get error Exception in thread main java.io.FileNotFoundException: Path is not a file: /apps/hive/warehouse/hl7/111 111 is directory having more parquet files. After renaming 111 directory to people4a its working without issue and I am able to fetch data from nested Directory. I tried different directory names but it failed for all except people4a. Am I missing anything? Regards, Nishant -- Regards, Nishant
Re: Errors in the workers machines
1. For what reasons is using Spark the above ports? What internal component is triggering them? -Akka(guessing from the error log) is used to schedule tasks and to notify executors - the ports used are random by default 2. How I can get rid of these errors? - Probably the ports are not open on your server.You can set certain ports and open them using spark.driver.port and spark.executor.port. Or you can open all ports between the masters and slaves. for a cluster on ec2, the ec2 script takes care of the required. 3. Why the application is still finished with success? - DO you have more worker in the cluster which are able to connect. 4. Why is trying with more ports? - Not sure, Its picking the ports randomly. On Thu, Feb 5, 2015 at 2:30 PM, Spico Florin spicoflo...@gmail.com wrote: Hello! I received the following errors in the workerLog.log files: ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@stream4:33660] - [akka.tcp://sparkExecutor@stream4:47929]: Error [Association failed with [akka.tcp://sparkExecutor@stream4:47929]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@stream4:47929] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: stream4/x.x.x.x:47929 ] (For security reason have masked the IP with x.x.x.x). The same errors occurs for different ports (42395,39761). Even though I have these errors the application is finished with success. I have the following questions: 1. For what reasons is using Spark the above ports? What internal component is triggering them? 2. How I can get rid of these errors? 3. Why the application is still finished with success? 4. Why is trying with more ports? I look forward for your answers. Regards. Florin -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Pyspark Hbase scan.
?Hi, I am trying to do a hbase scan and read it into a spark rdd using pyspark. I have successfully written data to hbase from pyspark, and been able to read a full table from hbase using the python example code. Unfortunately I am unable to find any example code for doing an HBase scan and read it into a spark rdd from pyspark. I have found a scala example : http://stackoverflow.com/questions/25189527/how-to-process-a-range-of-hbase-rows-using-spark But i can't find anything on how to do this from python. Can anybody shed some light on how (and if) this can be done?? Regards Rene Castberg? ** This e-mail and any attachments thereto may contain confidential information and/or information protected by intellectual property rights for the exclusive attention of the intended addressees named above. If you have received this transmission in error, please immediately notify the sender by return e-mail and delete this message and its attachments. Unauthorized use, copying or further full or partial distribution of this e-mail or its contents is prohibited. **
Re: Spark Job running on localhost on yarn cluster
The problem got resolved after removing all the configuration files from all the slave nodes. Earlier we were running in the standalone mode and that lead to duplicating the configuration on all the slaves. Once that was done it ran as expected in cluster mode. Although performance is not up to the standalone mode. However, as compared to the standalone mode, spark on yarn runs very slow. I am running it as $SPARK_HOME/bin/spark-submit --class EDDApp --master yarn-cluster --num-executors 10 --executor-memory 14g target/scala-2.10/edd-application_2.10-1.0.jar hdfs://hm41:9000/user/hduser/newtrans.csv hdfs://hm41:9000/user/hduser/trans-out We have a cluster of 5 nodes with each having 16GB RAM and 8 cores each. We have configured the minimum container size as 3GB and maximum as 14GB in yarn-site.xml. When submitting the job to yarn-cluster we supply number of executor = 10, memory of executor =14 GB. According to my understanding our job should be allocated 4 container of 14GB. But the spark UI shows only 3 container of 7.2GB each. We are unable to ensure the container number and resources allocated to it. This causes detrimental performance when compared to the standalone mode. Regards, Kundan On Thu, Feb 5, 2015 at 12:49 PM, Felix C felixcheun...@hotmail.com wrote: Is YARN_CONF_DIR set? --- Original Message --- From: Aniket Bhatnagar aniket.bhatna...@gmail.com Sent: February 4, 2015 6:16 AM To: kundan kumar iitr.kun...@gmail.com, spark users user@spark.apache.org Subject: Re: Spark Job running on localhost on yarn cluster Have you set master in SparkConf/SparkContext in your code? Driver logs show in which mode the spark job is running. Double check if the logs mention local or yarn-cluster. Also, what's the error that you are getting? On Wed, Feb 4, 2015, 6:13 PM kundan kumar iitr.kun...@gmail.com wrote: Hi, I am trying to execute my code on a yarn cluster The command which I am using is $SPARK_HOME/bin/spark-submit --class EDDApp target/scala-2.10/edd-application_2.10-1.0.jar --master yarn-cluster --num-executors 3 --driver-memory 6g --executor-memory 7g outpuPath But, I can see that this program is running only on the localhost. Its able to read the file from hdfs. I have tried this in standalone mode and it works fine. Please suggest where is it going wrong. Regards, Kundan
Re: Pyspark Hbase scan.
Hi, In fact, this pull https://github.com/apache/spark/pull/3920 is to do Hbase scan. However, it is not merged yet. You can also take a look at the example code at http://spark-packages.org/package/20 which is using scala and python to read data from hbase. Hope this can be helpful. Cheers Gen On Thu, Feb 5, 2015 at 11:11 AM, Castberg, René Christian rene.castb...@dnvgl.com wrote: Hi, I am trying to do a hbase scan and read it into a spark rdd using pyspark. I have successfully written data to hbase from pyspark, and been able to read a full table from hbase using the python example code. Unfortunately I am unable to find any example code for doing an HBase scan and read it into a spark rdd from pyspark. I have found a scala example : http://stackoverflow.com/questions/25189527/how-to-process-a-range-of-hbase-rows-using-spark But i can't find anything on how to do this from python. Can anybody shed some light on how (and if) this can be done? Regards Rene Castberg ** This e-mail and any attachments thereto may contain confidential information and/or information protected by intellectual property rights for the exclusive attention of the intended addressees named above. If you have received this transmission in error, please immediately notify the sender by return e-mail and delete this message and its attachments. Unauthorized use, copying or further full or partial distribution of this e-mail or its contents is prohibited. **
Re: How many stages in my application?
And the Job page of the web UI will give you an idea of stages completed out of the total number of stages for the job. That same information is also available as JSON. Statically determining how many stages a job logically comprises is one thing, but dynamically determining how many stages remain to be run to complete a job is a surprisingly tricky problem -- take a look at the discussion that went into Josh's Job page PR to get an idea of the issues and subtleties involved: https://github.com/apache/spark/pull/3009 On Thu, Feb 5, 2015 at 1:27 AM, Mark Hamstra m...@clearstorydata.com wrote: RDD#toDebugString will help. On Thu, Feb 5, 2015 at 1:15 AM, Joe Wass jw...@crossref.org wrote: Thanks Akhil and Mark. I can of course count events (assuming I can deduce the shuffle boundaries), but like I said the program isn't simple and I'd have to do this manually every time I change the code. So I rather find a way of doing this automatically if possible. On 4 February 2015 at 19:41, Mark Hamstra m...@clearstorydata.com wrote: But there isn't a 1-1 mapping from operations to stages since multiple operations will be pipelined into a single stage if no shuffle is required. To determine the number of stages in a job you really need to be looking for shuffle boundaries. On Wed, Feb 4, 2015 at 11:27 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can easily understand the flow by looking at the number of operations in your program (like map, groupBy, join etc.), first of all you list out the number of operations happening in your application and then from the webui you will be able to see how many operations have happened so far. Thanks Best Regards On Wed, Feb 4, 2015 at 4:33 PM, Joe Wass jw...@crossref.org wrote: I'm sitting here looking at my application crunching gigabytes of data on a cluster and I have no idea if it's an hour away from completion or a minute. The web UI shows progress through each stage, but not how many stages remaining. How can I work out how many stages my program will take automatically? My application has a slightly interesting DAG (re-use of functions that contain Spark transformations, persistent RDDs). Not that complex, but not 'step 1, step 2, step 3'. I'm guessing that if the driver program runs sequentially sending messages to Spark, then Spark has no knowledge of the structure of the driver program. Therefore it's necessary to execute it on a small test dataset and see how many stages result? When I set spark.eventLog.enabled = true and run on (very small) test data I don't get any stage messages in my STDOUT or in the log file. This is on a `local` instance. Did I miss something obvious? Thanks! Joe
Re: Spark stalls or hangs: is this a clue? remote fetches seem to never return?
My apologies for following up my own post, but I thought this might be of interest. I terminated the java process corresponding to executor which had opened the stderr file mentioned below (kill pid).Then my spark job completed without error (it was actually almost finished). Now I am completely confused :-). Thanks!-Mike From: Michael Albert m_albert...@yahoo.com.INVALID To: user@spark.apache.org user@spark.apache.org Sent: Thursday, February 5, 2015 9:04 PM Subject: Spark stalls or hangs: is this a clue? remote fetches seem to never return? Greetings! Again, thanks to all who have given suggestions.I am still trying to diagnose a problem where I have processes than run for one or several hours but intermittently stall or hang.By stall I mean that there is no CPU usage on the workers or the driver, nor network activity, nor do I see disk activity.It just hangs. Using the Application Master to find which workers still had active tasks, I then went to that machine and looked in the user logs.In one of the users log's stderr files, it ends with Started 50 remote fetchesShould there be a message saying that the fetch was completed?Any suggestions as to how I might diagnose why the fetch was not completed? Thanks!-Mike Here is the last part of the log:15/02/06 01:33:46 INFO storage.MemoryStore: ensureFreeSpace(5368) called with curMem=875861, maxMem=231564902415/02/06 01:33:46 INFO storage.MemoryStore: Block broadcast_10 stored as values in memory (estimated size 5.2 KB, free 2.2 GB)15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 5, fetching them15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-171-0-208.ec2.internal:44124/user/MapOutputTracker#-878402310]15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 5, fetching them15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Got the output locations15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Getting 300 non-empty blocks out of 300 blocks15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Getting 300 non-empty blocks out of 300 blocks15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Started 50 remote fetches in 47 ms15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Started 50 remote fetches in 48 msIt's been like that for half and hour. Thanks!-Mike
spark on ec2
Hi, I'm trying to change setting as described here: http://spark.apache.org/docs/1.2.0/ec2-scripts.html export SPARK_WORKER_CORES=6 Then I ran ~/spark-ec2/copy-dir /root/spark/conf to distribute to slaves, but without any effect. Do I have to restart workers? How to do that with spark-ec2? Thanks.
Re: Problems with GC and time to execute with different number of executors.
Any idea why if I use more containers I get a lot of stopped because GC? 2015-02-05 8:59 GMT+01:00 Guillermo Ortiz konstt2...@gmail.com: I'm not caching the data. with each iteration I mean,, each 128mb that a executor has to process. The code is pretty simple. final Conversor c = new Conversor(null, null, null, longFields,typeFields); SparkConf conf = new SparkConf().setAppName(Simple Application); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDbyte[] rdd = sc.binaryRecords(path, c.calculaLongBlock()); JavaRDDString rddString = rdd.map(new Functionbyte[], String() { @Override public String call(byte[] arg0) throws Exception { String result = c.parse(arg0).toString(); return result; } }); rddString.saveAsTextFile(url + /output/ + System.currentTimeMillis()+ /); The parse function just takes an array of bytes and applies some transformations like,,, [0..3] an integer, [4...20] an String, [21..27] another String and so on. It's just a test code, I'd like to understand what it's happeing. 2015-02-04 18:57 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com: Hi Guillermo, What exactly do you mean by each iteration? Are you caching data in memory? -Sandy On Wed, Feb 4, 2015 at 5:02 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I execute a job in Spark where I'm processing a file of 80Gb in HDFS. I have 5 slaves: (32cores /256Gb / 7physical disks) x 5 I have been trying many different configurations with YARN. yarn.nodemanager.resource.memory-mb 196Gb yarn.nodemanager.resource.cpu-vcores 24 I have tried to execute the job with different number of executors a memory (1-4g) With 20 executors takes 25s each iteration (128mb) and it never has a really long time waiting because GC. When I execute around 60 executors the process time it's about 45s and some tasks take until one minute because GC. I have no idea why it's calling GC when I execute more executors simultaneously. The another question it's why it takes more time to execute each block. My theory about the this it's because there're only 7 physical disks and it's not the same 5 processes writing than 20. The code is pretty simple, it's just a map function which parse a line and write the output in HDFS. There're a lot of substrings inside of the function what it could cause GC. Any theory about? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to design a long live spark application
Hi, You can also check out the Spark Kernel project: https://github.com/ibm-et/spark-kernel It can plug into the upcoming IPython 3.0 notebook (providing a Scala/Spark language interface) and provides an API to submit code snippets (like the Spark Shell) and get results directly back, rather than having to write out your results elsewhere. A client library ( https://github.com/ibm-et/spark-kernel/wiki/Guide-for-the-Spark-Kernel-Client) is available in Scala so you can create applications that can interactively communicate with Apache Spark. You can find a getting started section here: https://github.com/ibm-et/spark-kernel/wiki/Getting-Started-with-the-Spark-Kernel If you have any more questions about the project, feel free to email me! Signed, Chip Senkbeil On Thu Feb 05 2015 at 10:58:01 AM Corey Nolet cjno...@gmail.com wrote: Here's another lightweight example of running a SparkContext in a common java servlet container: https://github.com/calrissian/spark-jetty-server On Thu, Feb 5, 2015 at 11:46 AM, Charles Feduke charles.fed...@gmail.com wrote: If you want to design something like Spark shell have a look at: http://zeppelin-project.org/ Its open source and may already do what you need. If not, its source code will be helpful in answering the questions about how to integrate with long running jobs that you have. On Thu Feb 05 2015 at 11:42:56 AM Boromir Widas vcsub...@gmail.com wrote: You can check out https://github.com/spark-jobserver/spark-jobserver - this allows several users to upload their jars and run jobs with a REST interface. However, if all users are using the same functionality, you can write a simple spray server which will act as the driver and hosts the spark context+RDDs, launched in client mode. On Thu, Feb 5, 2015 at 10:25 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I want to develop a server side application: User submit request à Server run spark application and return (this might take a few seconds). So I want to host the server to keep the long-live context, I don’t know whether this is reasonable or not. Basically I try to have a global JavaSparkContext instance and keep it there, and initialize some RDD. Then my java application will use it to submit the job. So now I have some questions: 1, if I don’t close it, will there any timeout I need to configure on the spark server? 2, In theory I want to design something similar to Spark shell (which also host a default sc there), just it is not shell based. Any suggestion? I think my request is very common for application development, here must someone has done it before? Regards, Shawn
get null potiner exception newAPIHadoopRDD.map()
I modified the code Base on CassandraCQLTest. to get the area code count base on time zone. I got error on create new map Rdd. Any helping is appreciated. Thanks. ... val arecodeRdd = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[CqlPagingInputFormat], classOf[java.util.Map[String,ByteBuffer]], classOf[java.util.Map[String,ByteBuffer]]) println(Count: + arecodeRdd.count) //got right count // arecodeRdd.saveAsTextFile(/tmp/arecodeRddrdd.txt); val areaCodeSelectedRDD = arecodeRdd.map { case (key, value) = { * (ByteBufferUtil.string(value.get((area_code)), ByteBufferUtil.string(value.get(time_zone))) * //failed } } println(areaCodeRDD: + areaCodeSelectedRDD.count) ... Here is the stack trace: 15/02/05 13:38:15 ERROR executor.Executor: Exception in task 109.0 in stage 1.0 (TID 366) java.lang.NullPointerException at org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:167) at org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:124) at org.apache.spark.examples.CassandraAreaCodeLocation$$anonfun$1.apply(CassandraAreaCodeLocation.scala:68) at org.apache.spark.examples.CassandraAreaCodeLocation$$anonfun$1.apply(CassandraAreaCodeLocation.scala:66) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1311) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:910) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:910) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/05 13:38:15 INFO scheduler.TaskSetManager: Starting task 110.0 in stage 1.0 (TID 367, localhost, ANY, 1334 bytes) 15/02/05 13:38:15 INFO executor.Executor: Running task 110.0 in stage 1.0 (TID 367) 15/02/05 13:38:15 INFO rdd.NewHadoopRDD: Input split: ColumnFamilySplit((-8484684946848467066, '-8334833978340269788] @[127.0.0.1]) 15/02/05 13:38:15 WARN scheduler.TaskSetManager: Lost task 109.0 in stage 1.0 (TID 366, localhost): java.lang.NullPointerException at org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:167) at org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:124) at org.apache.spark.examples.CassandraAreaCodeLocation$$anonfun$1.apply(CassandraAreaCodeLocation.scala:68) at org.apache.spark.examples.CassandraAreaCodeLocation$$anonfun$1.apply(CassandraAreaCodeLocation.scala:66) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1311) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:910) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:910) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/get-null-potiner-exception-newAPIHadoopRDD-map-tp21520.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Shuffle Dependency Casting error
Hi, I am working on a text mining project and I want to use NaiveBayesClassifier of MLlib to classify some stream items. So, I have two Spark contexts one of which is a streaming context. Everything looks fine if I comment out train and predict methods, it works fine although doesn't obviously do what I want. The exception (and its trace) I am getting is below. Any ideas? Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassCastException: org.apache.spark.SparkContext$$anonfun$runJob$4 cannot be cast to org.apache.spark.ShuffleDependency at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-Dependency-Casting-error-tp21518.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle Dependency Casting error
Hi, Could you share the code snippet. Thanks, Vishnu On Thu, Feb 5, 2015 at 11:22 PM, aanilpala aanilp...@gmail.com wrote: Hi, I am working on a text mining project and I want to use NaiveBayesClassifier of MLlib to classify some stream items. So, I have two Spark contexts one of which is a streaming context. Everything looks fine if I comment out train and predict methods, it works fine although doesn't obviously do what I want. The exception (and its trace) I am getting is below. Any ideas? Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassCastException: org.apache.spark.SparkContext$$anonfun$runJob$4 cannot be cast to org.apache.spark.ShuffleDependency at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-Dependency-Casting-error-tp21518.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: get null potiner exception newAPIHadoopRDD.map()
Is it possible that value.get((area_code)) or value.get(time_zone)) returned null ? On Thu, Feb 5, 2015 at 10:58 AM, oxpeople vincent.y@bankofamerica.com wrote: I modified the code Base on CassandraCQLTest. to get the area code count base on time zone. I got error on create new map Rdd. Any helping is appreciated. Thanks. ... val arecodeRdd = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[CqlPagingInputFormat], classOf[java.util.Map[String,ByteBuffer]], classOf[java.util.Map[String,ByteBuffer]]) println(Count: + arecodeRdd.count) //got right count // arecodeRdd.saveAsTextFile(/tmp/arecodeRddrdd.txt); val areaCodeSelectedRDD = arecodeRdd.map { case (key, value) = { * (ByteBufferUtil.string(value.get((area_code)), ByteBufferUtil.string(value.get(time_zone))) * //failed } } println(areaCodeRDD: + areaCodeSelectedRDD.count) ... Here is the stack trace: 15/02/05 13:38:15 ERROR executor.Executor: Exception in task 109.0 in stage 1.0 (TID 366) java.lang.NullPointerException at org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:167) at org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:124) at org.apache.spark.examples.CassandraAreaCodeLocation$$anonfun$1.apply(CassandraAreaCodeLocation.scala:68) at org.apache.spark.examples.CassandraAreaCodeLocation$$anonfun$1.apply(CassandraAreaCodeLocation.scala:66) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1311) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:910) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:910) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/05 13:38:15 INFO scheduler.TaskSetManager: Starting task 110.0 in stage 1.0 (TID 367, localhost, ANY, 1334 bytes) 15/02/05 13:38:15 INFO executor.Executor: Running task 110.0 in stage 1.0 (TID 367) 15/02/05 13:38:15 INFO rdd.NewHadoopRDD: Input split: ColumnFamilySplit((-8484684946848467066, '-8334833978340269788] @[127.0.0.1]) 15/02/05 13:38:15 WARN scheduler.TaskSetManager: Lost task 109.0 in stage 1.0 (TID 366, localhost): java.lang.NullPointerException at org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:167) at org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:124) at org.apache.spark.examples.CassandraAreaCodeLocation$$anonfun$1.apply(CassandraAreaCodeLocation.scala:68) at org.apache.spark.examples.CassandraAreaCodeLocation$$anonfun$1.apply(CassandraAreaCodeLocation.scala:66) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1311) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:910) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:910) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/get-null-potiner-exception-newAPIHadoopRDD-map-tp21520.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Job running on localhost on yarn cluster
Kundan, So I think your configuration here is incorrect. We need to adjust memory and #executors. So for your case you have: Cluster setup 5 nodes 16gb RAM 8 cores. The number of executors should be the total number of nodes in your cluster - in your case 5. As for --num-executor-cores it should be total cores on the machine - 1 for the AM. So for your you --num-executor-cores=7. On to memory. When configuring memory you need to account for the memory overhead that spark adds - default is 7% of executor memory. If yarn has a max of 14GB per nodemanager, and you set your executor-memory to 14GB, spark is actually requesting requesting 1.07*14GB = 14.98GB. You should double check your configuration but if all your yarn containers have a max of 14GB then no executors should be launching since spark can't get the resources it's asking for. Maybe you have 3 node managers configured with more memory? For your setup the memory calculation is:1 executorMemoryGB * 1.07 = 14GB = 14GB/1.07 ~ 13GB. Your command args should be something like: --master yarn-cluster --num-executors 5 --num-executor-cores 7 --executor-memory 13g As for the UI, where did you see 7.2GB? can you send a screen shot? Hope this helps, Kostas On Thursday, February 5, 2015, kundan kumar iitr.kun...@gmail.com wrote: The problem got resolved after removing all the configuration files from all the slave nodes. Earlier we were running in the standalone mode and that lead to duplicating the configuration on all the slaves. Once that was done it ran as expected in cluster mode. Although performance is not up to the standalone mode. However, as compared to the standalone mode, spark on yarn runs very slow. I am running it as $SPARK_HOME/bin/spark-submit --class EDDApp --master yarn-cluster --num-executors 10 --executor-memory 14g target/scala-2.10/edd-application_2.10-1.0.jar hdfs://hm41:9000/user/hduser/newtrans.csv hdfs://hm41:9000/user/hduser/trans-out We have a cluster of 5 nodes with each having 16GB RAM and 8 cores each. We have configured the minimum container size as 3GB and maximum as 14GB in yarn-site.xml. When submitting the job to yarn-cluster we supply number of executor = 10, memory of executor =14 GB. According to my understanding our job should be allocated 4 container of 14GB. But the spark UI shows only 3 container of 7.2GB each. We are unable to ensure the container number and resources allocated to it. This causes detrimental performance when compared to the standalone mode. Regards, Kundan On Thu, Feb 5, 2015 at 12:49 PM, Felix C felixcheun...@hotmail.com wrote: Is YARN_CONF_DIR set? --- Original Message --- From: Aniket Bhatnagar aniket.bhatna...@gmail.com Sent: February 4, 2015 6:16 AM To: kundan kumar iitr.kun...@gmail.com, spark users user@spark.apache.org Subject: Re: Spark Job running on localhost on yarn cluster Have you set master in SparkConf/SparkContext in your code? Driver logs show in which mode the spark job is running. Double check if the logs mention local or yarn-cluster. Also, what's the error that you are getting? On Wed, Feb 4, 2015, 6:13 PM kundan kumar iitr.kun...@gmail.com wrote: Hi, I am trying to execute my code on a yarn cluster The command which I am using is $SPARK_HOME/bin/spark-submit --class EDDApp target/scala-2.10/edd-application_2.10-1.0.jar --master yarn-cluster --num-executors 3 --driver-memory 6g --executor-memory 7g outpuPath But, I can see that this program is running only on the localhost. Its able to read the file from hdfs. I have tried this in standalone mode and it works fine. Please suggest where is it going wrong. Regards, Kundan
Re: LeaseExpiredException while writing schemardd to hdfs
Why don't you just map rdd's rows to lines and then call saveAsTextFile()? On 3.2.2015. 11:15, Hafiz Mujadid wrote: I want to write whole schemardd to single in hdfs but facing following exception rg.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /test/data/data1.csv (inode 402042): File does not exist. Holder DFSClient_NONMAPREDUCE_-564238432_57 does not have any open files here is my code rdd.foreachPartition( iterator = { var output = new Path( outputpath ) val fs = FileSystem.get( new Configuration() ) var writer : BufferedWriter = null writer = new BufferedWriter( new OutputStreamWriter( fs.create( output ) ) ) var line = new StringBuilder iterator.foreach( row = { row.foreach( column = { line.append( column.toString + splitter ) } ) writer.write( line.toString.dropRight( 1 ) ) writer.newLine() line.clear } ) writer.close() } ) I think problem is that I am making writer for each partition and multiple writer are executing in parallel so when they try to write to same file then this problem appears. When I avoid this approach then I face task not serializable exception Any suggest to handle this problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LeaseExpiredException-while-writing-schemardd-to-hdfs-tp21477.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLlib - Show an element in RDD[(Int, Iterable[Array[Double]])]
Hi, I'm learning Spark and testing the Spark MLlib library with algorithm K-means. So, I created a file height-weight.txt like this: 65.0 220.0 73.0 160.0 59.0 110.0 61.0 120.0 ... And the code (executed in spark-shell): import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.linalg.Vectors val data = sc.textFile(/opt/testAppSpark/data/height-weight.txt) val parsedData = data.map(s = Vectors.dense(s.split(' ').map(_.toDouble))).cache() val numCluster = 3 val numIterations = 30 val cluster = KMeans.train(parsedData, numCluster, numIterations) val groups = data.map{_.split(' ').map(_.toDouble)}.groupBy{rdd = cluster.predict(Vectors.dense(rdd))} groups.collect When I typed /groups.collect/, I received an information like: res29: Array[(Int, Iterable[Array[Double]])] = Array((0,CompactBuffer([D@12c6123d, [D@9d76c6c, [D@1e0f2b80, [D@75f0efea, [D@1d172824, [D@5b4c6267, [D@73d08704)), (2,CompactBuffer([D@7f505302, [D@7279e99a, [D@21d7b82d, [D@597ca3b6, [D@5e02fa0)), (1,CompactBuffer([D@4156b463, [D@235cf118, [D@2ad870cb, [D@67d53566, [D@5ea4f0cb, [D@1ebccff8, [D@7df9b28b, [D@1439044a))) Typing /groups/ em command line I see: res1: org.apache.spark.rdd.RDD[(Int, Iterable[Array[Double]])] = ShuffledRDD[28] at groupBy at console:24 How can I see the results? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Show-an-element-in-RDD-Int-Iterable-Array-Double-tp21521.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org