RE: Unsupported language features in query
Currently SparkSQL doesn’t support the row format/serde in CTAS. The work around is create the table first. -Original Message- From: centerqi hu [mailto:cente...@gmail.com] Sent: Tuesday, September 02, 2014 3:35 PM To: user@spark.apache.org Subject: Unsupported language features in query hql(CREATE TABLE tmp_adclick_gm_all ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' as SELECT SUM(uv) as uv, round(SUM(cost),2) as total, round(SUM(cost)/SUM(uv),2) FROM tmp_adclick_sellplat ) 14/09/02 15:32:28 INFO ParseDriver: Parse Completed java.lang.RuntimeException: Unsupported language features in query: CREATE TABLE tmp_adclick_gm_all ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY 'abc' as SELECT SUM(uv) as uv, round(SUM(cost),2) as total, round(SUM(cost)/SUM(uv),2) FROM tmp_adclick_sellplat TOK_CREATETABLE TOK_TABNAME tmp_adclick_gm_all TOK_LIKETABLE TOK_TABLEROWFORMAT TOK_SERDEPROPS TOK_TABLEROWFORMATFIELD ',' TOK_TABLEROWFORMATLINES 'abc' TOK_QUERY TOK_FROM TOK_TABREF TOK_TABNAME tmp_adclick_sellplat TOK_INSERT TOK_DESTINATION TOK_DIR TOK_TMP_FILE TOK_SELECT TOK_SELEXPR TOK_FUNCTION SUM TOK_TABLE_OR_COL uv uv TOK_SELEXPR TOK_FUNCTION round TOK_FUNCTION SUM TOK_TABLE_OR_COL cost 2 total TOK_SELEXPR TOK_FUNCTION round / TOK_FUNCTION SUM TOK_TABLE_OR_COL cost TOK_FUNCTION SUM TOK_TABLE_OR_COL uv 2 at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:255) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:75) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:78) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:27) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC.init(console:33) at $iwC$$iwC$$iwC.init(console:35) at $iwC$$iwC.init(console:37) at $iwC.init(console:39) -- cente...@gmail.com|齐忠 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unsupported language features in query
Thanks Cheng Hao Have a way of obtaining spark support hive statement list? Thanks 2014-09-02 15:39 GMT+08:00 Cheng, Hao hao.ch...@intel.com: Currently SparkSQL doesn’t support the row format/serde in CTAS. The work around is create the table first. -Original Message- From: centerqi hu [mailto:cente...@gmail.com] Sent: Tuesday, September 02, 2014 3:35 PM To: user@spark.apache.org Subject: Unsupported language features in query hql(CREATE TABLE tmp_adclick_gm_all ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' as SELECT SUM(uv) as uv, round(SUM(cost),2) as total, round(SUM(cost)/SUM(uv),2) FROM tmp_adclick_sellplat ) 14/09/02 15:32:28 INFO ParseDriver: Parse Completed java.lang.RuntimeException: Unsupported language features in query: CREATE TABLE tmp_adclick_gm_all ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY 'abc' as SELECT SUM(uv) as uv, round(SUM(cost),2) as total, round(SUM(cost)/SUM(uv),2) FROM tmp_adclick_sellplat TOK_CREATETABLE TOK_TABNAME tmp_adclick_gm_all TOK_LIKETABLE TOK_TABLEROWFORMAT TOK_SERDEPROPS TOK_TABLEROWFORMATFIELD ',' TOK_TABLEROWFORMATLINES 'abc' TOK_QUERY TOK_FROM TOK_TABREF TOK_TABNAME tmp_adclick_sellplat TOK_INSERT TOK_DESTINATION TOK_DIR TOK_TMP_FILE TOK_SELECT TOK_SELEXPR TOK_FUNCTION SUM TOK_TABLE_OR_COL uv uv TOK_SELEXPR TOK_FUNCTION round TOK_FUNCTION SUM TOK_TABLE_OR_COL cost 2 total TOK_SELEXPR TOK_FUNCTION round / TOK_FUNCTION SUM TOK_TABLE_OR_COL cost TOK_FUNCTION SUM TOK_TABLE_OR_COL uv 2 at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:255) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:75) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:78) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:27) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC.init(console:33) at $iwC$$iwC$$iwC.init(console:35) at $iwC$$iwC.init(console:37) at $iwC.init(console:39) -- cente...@gmail.com|齐忠 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- cente...@gmail.com|齐忠 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Unsupported language features in query
I am afraid no, but you can report that in Jira (https://issues.apache.org/jira/browse/SPARK) if you meet the missing functionalities in SparkSQL. SparkSQL aims to support all of the Hive functionalities (at least most of it) for HQL dialect. -Original Message- From: centerqi hu [mailto:cente...@gmail.com] Sent: Tuesday, September 02, 2014 3:46 PM To: Cheng, Hao Cc: user@spark.apache.org Subject: Re: Unsupported language features in query Thanks Cheng Hao Have a way of obtaining spark support hive statement list? Thanks 2014-09-02 15:39 GMT+08:00 Cheng, Hao hao.ch...@intel.com: Currently SparkSQL doesn’t support the row format/serde in CTAS. The work around is create the table first. -Original Message- From: centerqi hu [mailto:cente...@gmail.com] Sent: Tuesday, September 02, 2014 3:35 PM To: user@spark.apache.org Subject: Unsupported language features in query hql(CREATE TABLE tmp_adclick_gm_all ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' as SELECT SUM(uv) as uv, round(SUM(cost),2) as total, round(SUM(cost)/SUM(uv),2) FROM tmp_adclick_sellplat ) 14/09/02 15:32:28 INFO ParseDriver: Parse Completed java.lang.RuntimeException: Unsupported language features in query: CREATE TABLE tmp_adclick_gm_all ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY 'abc' as SELECT SUM(uv) as uv, round(SUM(cost),2) as total, round(SUM(cost)/SUM(uv),2) FROM tmp_adclick_sellplat TOK_CREATETABLE TOK_TABNAME tmp_adclick_gm_all TOK_LIKETABLE TOK_TABLEROWFORMAT TOK_SERDEPROPS TOK_TABLEROWFORMATFIELD ',' TOK_TABLEROWFORMATLINES 'abc' TOK_QUERY TOK_FROM TOK_TABREF TOK_TABNAME tmp_adclick_sellplat TOK_INSERT TOK_DESTINATION TOK_DIR TOK_TMP_FILE TOK_SELECT TOK_SELEXPR TOK_FUNCTION SUM TOK_TABLE_OR_COL uv uv TOK_SELEXPR TOK_FUNCTION round TOK_FUNCTION SUM TOK_TABLE_OR_COL cost 2 total TOK_SELEXPR TOK_FUNCTION round / TOK_FUNCTION SUM TOK_TABLE_OR_COL cost TOK_FUNCTION SUM TOK_TABLE_OR_COL uv 2 at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:255) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:75) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:78) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:27) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC.init(console:33) at $iwC$$iwC$$iwC.init(console:35) at $iwC$$iwC.init(console:37) at $iwC.init(console:39) -- cente...@gmail.com|齐忠 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- cente...@gmail.com|齐忠
New features (Discretization) for v1.x in xiangrui.pdf
is there any news about Discretization in spark? is there anything on git? i didnt find yet -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/New-features-Discretization-for-v1-x-in-xiangrui-pdf-tp13256.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
save schemardd to hive
I want to save schemardd to hive val usermeta = hql( SELECT userid,idlist from usermeta WHERE day='2014-08-01' limit 1000) case class SomeClass(name:String,idlist:String) val schemardd = usermeta.map(t={SomeClass(t(0).toString,t(1).toString)}) How to save schemardd to hive? Thanks -- cente...@gmail.com|齐忠 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: New features (Discretization) for v1.x in xiangrui.pdf
i guess i found it https://github.com/LIDIAgroup/SparkFeatureSelection -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/New-features-Discretization-for-v1-x-in-xiangrui-pdf-tp13256p13261.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: save schemardd to hive
I got it import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ val usermeta = hql( SELECT userid,idlist from meta WHERE day='2014-08-01' limit 1000) case class SomeClass(name:String,idlist:String) val scmm = usermeta.map(t={SomeClass(t(0).toString,t(1).toString +id)}) val good = createSchemaRDD(scmm) good.saveAsTable(meta_test) 2014-09-02 17:50 GMT+08:00 centerqi hu cente...@gmail.com: I want to save schemardd to hive val usermeta = hql( SELECT userid,idlist from usermeta WHERE day='2014-08-01' limit 1000) case class SomeClass(name:String,idlist:String) val schemardd = usermeta.map(t={SomeClass(t(0).toString,t(1).toString)}) How to save schemardd to hive? Thanks -- cente...@gmail.com|齐忠 -- cente...@gmail.com|齐忠 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: save schemardd to hive
You can use saveAsTable or do an INSERT SparkSQL statement as well in case you need other Hive query features, like partitioning. On 9/2/14, 6:54 AM, centerqi hu cente...@gmail.com wrote: I got it import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ val usermeta = hql( SELECT userid,idlist from meta WHERE day='2014-08-01' limit 1000) case class SomeClass(name:String,idlist:String) val scmm = usermeta.map(t={SomeClass(t(0).toString,t(1).toString +id)}) val good = createSchemaRDD(scmm) good.saveAsTable(meta_test) 2014-09-02 17:50 GMT+08:00 centerqi hu cente...@gmail.com: I want to save schemardd to hive val usermeta = hql( SELECT userid,idlist from usermeta WHERE day='2014-08-01' limit 1000) case class SomeClass(name:String,idlist:String) val schemardd = usermeta.map(t={SomeClass(t(0).toString,t(1).toString)}) How to save schemardd to hive? Thanks -- cente...@gmail.com|齐忠 -- cente...@gmail.com|齐忠 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: save schemardd to hive
Thank you very much, also can do this, it seems that I know too little about RDD 在 2014年9月2日,21:22,Silvio Fiorito silvio.fior...@granturing.com 写道: Once you’ve registered an RDD as a table, you can use it in SparkSQL statements: myrdd.registerAsTable(“my_table”) hql(“FROM my_table INSERT INTO TABLE my_other_table”) On 9/2/14, 9:18 AM, centerqi cente...@gmail.com wrote: schemardd can exec SQL stamens? 在 2014年9月2日,20:42,Silvio Fiorito silvio.fior...@granturing.com 写道: You can use saveAsTable or do an INSERT SparkSQL statement as well in case you need other Hive query features, like partitioning. On 9/2/14, 6:54 AM, centerqi hu cente...@gmail.com wrote: I got it import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ val usermeta = hql( SELECT userid,idlist from meta WHERE day='2014-08-01' limit 1000) case class SomeClass(name:String,idlist:String) val scmm = usermeta.map(t={SomeClass(t(0).toString,t(1).toString +id)}) val good = createSchemaRDD(scmm) good.saveAsTable(meta_test) 2014-09-02 17:50 GMT+08:00 centerqi hu cente...@gmail.com: I want to save schemardd to hive val usermeta = hql( SELECT userid,idlist from usermeta WHERE day='2014-08-01' limit 1000) case class SomeClass(name:String,idlist:String) val schemardd = usermeta.map(t={SomeClass(t(0).toString,t(1).toString)}) How to save schemardd to hive? Thanks -- cente...@gmail.com|齐忠 -- cente...@gmail.com|齐忠 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Streaming] Cannot get executors to stay alive
Trying to bump this -- I'm basically asking if anyone has noticed the executor leaking memory. I have a large key space but no churn in the RDD so I don't understand why memory consumption grows with time. Any experiences with streaming welcomed -- I'm hoping I'm doing something wrong -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tp12940p13268.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Java Configuration.
Team, I am new to Apache Spark and I didn't have much knowledge on hadoop or big data. I need clarifications on the below, How does Spark Configuration works, from a tutorial i got the below /SparkConf conf = new SparkConf().setAppName(Simple application) .setMaster(local[4]); JavaSparkContext java_SC = new JavaSparkContext(conf);/ from this, i understood that we are providing the config through java program to Spark. Let us assume i have written this in a separate java method. My question are what happen if i am keep on calling this? If this one will will keep on creating new objects for spark on each call, then how we are going to handle the JVM memory? Since under each object i am trying to run 4 concurrent threads? Is there any option to find existing one in JVM, so instead of creating new Spark object i can go with it? Please help me on this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Java-Configuration-tp13269.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Where to save intermediate results?
I don't have any personal experience with Spark Streaming. Whether you store your data in HDFS or a database or something else probably depends on the nature of your use case. On Fri, Aug 29, 2014 at 10:38 AM, huylv huy.le...@insight-centre.org wrote: Hi Daniel, Your suggestion is definitely an interesting approach. In fact, I already have another system to deal with the stream analytical processing part. So basically, the Spark job to aggregate data just accumulatively computes aggregations from historical data together with new batch, which has been partly summarized by the stream processor. Answering queries involves in combining pre-calculated historical data together with on-stream aggregations. This sounds much like what Spark Streaming is intended to do. So I'll take a look deeper into Spark Streaming to consider porting the stream processing part to use Spark Streaming. Regarding saving pre-calculated data onto external storages (disk, database...), I'm looking at Cassandra for now. But I don't know how it fits into my context and how is its performance compared to saving to files in HDFS. Also, is there anyway to keep the precalculated data both on disk and on memory, so that when the batch job terminated, historical data still available on memory for combining with stream processor, while still be able to survive system failure or upgrade? Not to mention the size of precalculated data might get too big, in that case, partly store newest data on memory only would be better. Tachyon looks like a nice option but again, I don't have experience with it and it's still an experimental feature of Spark. Regards, Huy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Where-to-save-intermediate-results-tp13062p13127.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Using Spark's ActionSystem for performing analytics using Akka
Sorry about the noob question, but I was just wondering if we use Spark's ActorSystem (SparkEnv.actorSystem), would it distribute actors across worker nodes or would the actors only run in driver JVM?
Re: pyspark yarn got exception
Hi Oleg, If you are running Spark on a yarn cluster, you should set --master to yarn. By default this runs in client mode, which redirects all output of your application to your console. This is failing because it is trying to connect to a standalone master that you probably did not start. I am somewhat puzzled as to how you ran into an OOM from this configuration, however. Does this problem still occur if you set the correct master? -Andrew 2014-09-02 2:42 GMT-07:00 Oleg Ruchovets oruchov...@gmail.com: Hi , I've installed pyspark on hpd hortonworks cluster. Executing pi example: command: spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563]# ./bin/spark-submit --master spark://10.193.1.71:7077 examples/src/main/python/pi.py 1000 exception: 14/09/02 17:34:02 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/09/02 17:34:02 INFO SecurityManager: Changing view acls to: root 14/09/02 17:34:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/02 17:34:02 INFO Slf4jLogger: Slf4jLogger started 14/09/02 17:34:02 INFO Remoting: Starting remoting 14/09/02 17:34:03 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@hdop-m.agt:41059] 14/09/02 17:34:03 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@hdop-m.agt:41059] 14/09/02 17:34:03 INFO SparkEnv: Registering MapOutputTracker 14/09/02 17:34:03 INFO SparkEnv: Registering BlockManagerMaster 14/09/02 17:34:03 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140902173403-cda8 14/09/02 17:34:03 INFO MemoryStore: MemoryStore started with capacity 294.9 MB. 14/09/02 17:34:03 INFO ConnectionManager: Bound socket to port 34931 with id = ConnectionManagerId(HDOP-M.AGT,34931) 14/09/02 17:34:03 INFO BlockManagerMaster: Trying to register BlockManager 14/09/02 17:34:03 INFO BlockManagerInfo: Registering block manager HDOP-M.AGT:34931 with 294.9 MB RAM 14/09/02 17:34:03 INFO BlockManagerMaster: Registered BlockManager 14/09/02 17:34:03 INFO HttpServer: Starting HTTP Server 14/09/02 17:34:03 INFO HttpBroadcast: Broadcast server started at http://10.193.1.71:54341 14/09/02 17:34:03 INFO HttpFileServer: HTTP File server directory is /tmp/spark-77c7a7dc-181e-4069-a014-8103a6a6330a 14/09/02 17:34:03 INFO HttpServer: Starting HTTP Server 14/09/02 17:34:04 INFO SparkUI: Started SparkUI at http://HDOP-M.AGT:4040 14/09/02 17:34:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/02 17:34:04 INFO Utils: Copying /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/examples/src/main/python/pi.py to /tmp/spark-f2e0cc0f-59cb-4f6c-9d48-f16205a40c7e/pi.py 14/09/02 17:34:04 INFO SparkContext: Added file file:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/examples/src/main/python/pi.py at http://10.193.1.71:52938/files/pi.py with timestamp 1409650444941 14/09/02 17:34:05 INFO AppClient$ClientActor: Connecting to master spark://10.193.1.71:7077... 14/09/02 17:34:05 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.193.1.71:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@10.193.1.71:7077] 14/09/02 17:34:05 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.193.1.71:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@10.193.1.71:7077] 14/09/02 17:34:05 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.193.1.71:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@10.193.1.71:7077] 14/09/02 17:34:05 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.193.1.71:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@10.193.1.71:7077] 14/09/02 17:34:25 INFO AppClient$ClientActor: Connecting to master spark://10.193.1.71:7077... 14/09/02 17:34:25 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.193.1.71:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@10.193.1.71:7077] 14/09/02 17:34:25 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.193.1.71:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@10.193.1.71:7077] 14/09/02 17:34:25 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.193.1.71:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@10.193.1.71:7077] 14/09/02 17:34:25 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.193.1.71:7077: akka.remote.EndpointAssociationException: Association failed with
Re: Spark-shell return results when the job is executing?
Spark-shell, or any other Spark application, returns the full results of the job until it has finished executing. You could add a hook for it to write partial results to a file, but you may want to do so sparingly to incur fewer I/Os. If you have a large file and the result contains many lines, it is unlikely to fully fit in memory anyway, so it's probably not a bad idea to just write your results to a file in batches while the application is still running. -Andrew 2014-09-01 22:16 GMT-07:00 Hao Wang wh.s...@gmail.com: Hi, all I am wondering if I use Spark-shell to scan a large file to obtain lines containing error, whether the shell returns results while the job is executing, or the job has been totally finished. Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com
Re: Spark Java Configuration.
JavaSparkContext java_SC = new JavaSparkContext(conf); is the spark context. An application has a single spark context -- you won't be able to keep calling this -- you'll see an error if you try to create a second such object from the same application. Additionally, depending on your configuration, if you create a few different apps that each create a spark context, you'll see them all connected to the master in the UI. But they'll have to share executors on the worker machines you have available. You'll often see messages like No resources available if you are trying to run more than 1 app concurrently and the first app you start is resource greedy Hope this helps. On Tue, Sep 2, 2014 at 10:02 AM, pcsenthil pcsent...@gmail.com wrote: Team, I am new to Apache Spark and I didn't have much knowledge on hadoop or big data. I need clarifications on the below, How does Spark Configuration works, from a tutorial i got the below /SparkConf conf = new SparkConf().setAppName(Simple application) .setMaster(local[4]); JavaSparkContext java_SC = new JavaSparkContext(conf);/ from this, i understood that we are providing the config through java program to Spark. Let us assume i have written this in a separate java method. My question are what happen if i am keep on calling this? If this one will will keep on creating new objects for spark on each call, then how we are going to handle the JVM memory? Since under each object i am trying to run 4 concurrent threads? Is there any option to find existing one in JVM, so instead of creating new Spark object i can go with it? Please help me on this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Java-Configuration-tp13269.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark on YARN question
I'm working on setting up Spark on YARN using the HDP technical preview - http://hortonworks.com/kb/spark-1-0-1-technical-preview-hdp-2-1-3/ I have installed the Spark JARs on all the slave nodes and configured YARN to find the JARs. It seems like everything is working. Unless I'm misunderstanding, it seems like there isn't any configuration required on the YARN slave nodes at all, apart from telling YARN where to find the Spark JAR files. Do the YARN processes even pick up local Spark configuration files on the slave nodes, or is that all just pulled in on the client and passed along to YARN? Greg
Re: Spark on YARN question
I’ve put my Spark JAR into HDFS, and specify the SPARK_JAR variable to point to the HDFS location of the jar. I’m not using any specialized configuration files (like spark-env.sh), but rather setting things either by environment variable per node, passing application arguments to the job, or making a Zookeeper connection from my job to seed properties. From there, I can construct a SparkConf as necessary. mn On Sep 2, 2014, at 9:06 AM, Greg Hill greg.h...@rackspace.com wrote: I'm working on setting up Spark on YARN using the HDP technical preview - http://hortonworks.com/kb/spark-1-0-1-technical-preview-hdp-2-1-3/ I have installed the Spark JARs on all the slave nodes and configured YARN to find the JARs. It seems like everything is working. Unless I'm misunderstanding, it seems like there isn't any configuration required on the YARN slave nodes at all, apart from telling YARN where to find the Spark JAR files. Do the YARN processes even pick up local Spark configuration files on the slave nodes, or is that all just pulled in on the client and passed along to YARN? Greg
Re: Spark on YARN question
Hi Greg, You should not need to even manually install Spark on each of the worker nodes or put it into HDFS yourself. Spark on Yarn will ship all necessary jars (i.e. the assembly + additional jars) to each of the containers for you. You can specify additional jars that your application depends on through the --jars argument if you are using spark-submit / spark-shell / pyspark. As for environment variables, you can specify SPARK_YARN_USER_ENV on the driver node (where your application is submitted) to specify environment variables to be observed by your executors. If you are using the spark-submit / spark-shell / pyspark scripts, then you can set Spark properties in the conf/spark-defaults.conf properties file, and these will be propagated to the executors. In other words, configurations on the slave nodes don't do anything. For example, $ vim conf/spark-defaults.conf // set a few properties $ export SPARK_YARN_USER_ENV=YARN_LOCAL_DIR=/mnt,/mnt2 $ bin/spark-shell --master yarn --jars /local/path/to/my/jar1,/another/jar2 Best, -Andrew
Re: Spark on YARN question
Thanks. That sounds like how I was thinking it worked. I did have to install the JARs on the slave nodes for yarn-cluster mode to work, FWIW. It's probably just whichever node ends up spawning the application master that needs it, but it wasn't passed along from spark-submit. Greg From: Andrew Or and...@databricks.commailto:and...@databricks.com Date: Tuesday, September 2, 2014 11:05 AM To: Matt Narrell matt.narr...@gmail.commailto:matt.narr...@gmail.com Cc: Greg greg.h...@rackspace.commailto:greg.h...@rackspace.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark on YARN question Hi Greg, You should not need to even manually install Spark on each of the worker nodes or put it into HDFS yourself. Spark on Yarn will ship all necessary jars (i.e. the assembly + additional jars) to each of the containers for you. You can specify additional jars that your application depends on through the --jars argument if you are using spark-submit / spark-shell / pyspark. As for environment variables, you can specify SPARK_YARN_USER_ENV on the driver node (where your application is submitted) to specify environment variables to be observed by your executors. If you are using the spark-submit / spark-shell / pyspark scripts, then you can set Spark properties in the conf/spark-defaults.conf properties file, and these will be propagated to the executors. In other words, configurations on the slave nodes don't do anything. For example, $ vim conf/spark-defaults.conf // set a few properties $ export SPARK_YARN_USER_ENV=YARN_LOCAL_DIR=/mnt,/mnt2 $ bin/spark-shell --master yarn --jars /local/path/to/my/jar1,/another/jar2 Best, -Andrew
Spark on Mesos: Pyspark python libraries
Hi all, I am getting started with spark and mesos, I already have spark running on a mesos cluster and I am able to start the scala spark and pyspark shells, yay!. I still have questions on how to distribute 3rd party python libraries since i want to use stuff like nltk and mlib on pyspark that requires numpy. I am using salt for the configuration management so it is really easy for me to create an anaconda virtual environment and install all the libraries there on each mesos slave. My main question is if that's the recommended way of doing it 3rd party libraries? If the answer its yes, how do i tell pyspark to use that virtual environment (and not the default python) on the spark workers? I notice that there are some addFile addPyFile functions on the SparkContext but i don't want to distribute the libraries every single time if I can just do that once by writing some salt states for that. I am specially worried about numpy and its requirements. Hopefully this makes some sense. Thanks, Daniel Rodriguez
Serialized 3rd party libs
Hello, I’m using Spark streaming to aggregate data from a Kafka topic in sliding windows. Usually we want to persist this aggregated data to a MongoDB cluster, or republish to a different Kafka topic. When I include these 3rd party drivers, I usually get a NotSerializableException due to the parallelization of the job. To side step this, I’ve used static class variables which seem to help, e.g., I can run my jobs. Is this the proper way to provide 3rd party libs to Spark jobs? Does having these drivers declared as static prohibit me from parallelizing my job? Is this even a proper way to design jobs? An alternative (I assume) would be to aggregate my data into HDFS and have another process (perhaps non-Spark?) consume it and republish/persist? Thanks, Matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile makes no progress without caching RDD
As an update. I'm still getting the same issue. I ended up doing a coalesce instead of a cache to get around the memory issue but saveAsTextFile still won't proceed without the coalesce or cache first. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-makes-no-progress-without-caching-RDD-tp12613p13283.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Possible to make one executor be able to work on multiple tasks simultaneously?
+user@ An executor is specific to an application, but an application can be executing many jobs at once. So as I understand many jobs' tasks can be executing at once on an executor. You may not use your full 80-way parallelism if, for example, your data set doesn't have 80 partitions. I also believe Spark will not necessarily spread the load over executors, instead preferring to respect data and rack locality if possible. Those are two reasons you might see only 4 executors active. If you mean only 4 executors exist at all, is it possible the other 4 can't provide the memory you're asking for? On Tue, Sep 2, 2014 at 5:56 PM, Victor Tso-Guillen v...@paxata.com wrote: Actually one more question, since in preliminary runs I wasn't sure if I understood what's going on. Are the cores allocated to an executor able to execute tasks for different jobs simultaneously, or just for one job at a time? I have 10 workers with 8 cores each, and it appeared that one job got four executors at once, then four more later on. The system wasn't anywhere near saturation of 80 cores so I would've expected all 8 cores to be running simultaneously. If there's value to these questions, please reply back to the list. On Tue, Sep 2, 2014 at 6:58 AM, Victor Tso-Guillen v...@paxata.com wrote: Thank you for the help, guys. So as I expected, I didn't fully understand the options. I had SPARK_WORKER_CORES set to 1 because I did not realize that by setting to 1 it would mean an executor could operate on multiple tasks simultaneously. I just thought it was a hint to Spark that that executor could be expected to use that many threads, but otherwise I had not understood that it affected the scheduler that way. Thanks! On Sun, Aug 31, 2014 at 9:28 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Victor, As Sean said, executors actually execute multiple tasks at a time. The only reasons they wouldn't are either (1) if you launched an executor with just 1 core (you can configure how many cores the executors will use when you set up your Worker, or it will look at your system by default) or (2) if your tasks are acquiring some kind of global lock, so only one can run at a time. To test this, do the following: - Launch your standalone cluster (you can do it on just one machine by adding just localhost in the slaves file) - Go to http://:4040 and look at the worker list. Do you see workers with more than 1 core? If not, you need to launch the workers by hand or set SPARK_WORKER_CORES in conf/spark-env.sh. - Run your application. Make sure it has enough pending tasks for your cores in the driver web UI (http://:4040), and if so, jstack one of the CoarseGrainedExecutor processes on a worker to see what the threads are doing. (Look for threads that contain TaskRunner.run in them) You can also try a simple CPU-bound job that launches lots of tasks like this to see that all cores are being used: sc.parallelize(1 to 1000, 1000).map(_ = (1 to 20).product).count() Each task here takes 1-2 seconds to execute and there are 1000 of them so it should fill up your cluster. Matei On August 31, 2014 at 9:18:02 PM, Victor Tso-Guillen (v...@paxata.com(mailto:v...@paxata.com)) wrote: I'm pretty sure my terminology matches that doc except the doc makes no explicit mention of machines. In standalone mode, you can spawn multiple workers on a single machine and each will babysit one executor (per application). In my observation as well each executor can be assigned many tasks but operates on one at a time. If there's a way to have it execute in multiple tasks simultaneously in a single VM can you please show me how? Maybe I'm missing the requisite configuration options, no matter how common or trivial... On Sunday, August 31, 2014, Sean Owen wrote: The confusion may be your use of 'worker', which isn't matching what 'worker' means in Spark. Have a look at https://spark.apache.org/docs/latest/cluster-overview.html Of course one VM can run many tasks at once; that's already how Spark works. On Sun, Aug 31, 2014 at 4:52 AM, Victor Tso-Guillen wrote: I might not be making myself clear, so sorry about that. I understand that a machine can have as many spark workers as you'd like, for example one per core. A worker may be assigned to a pool for one or more applications, but for a single application let's just say a single worker will have at most a single executor. An executor can be assigned multiple tasks in its queue, but will work on one task at a time only. In local mode, you can specify the number of executors you want and they will all reside in the same vm. Those executors will each be able to operate on a single task at a time, though they may also have an arbitrary number of tasks in their queue. From the standpoint of a vm, however, a vm can therefore operate on
Regarding function unpersist on rdd
Hello, Can someone enlighten me regarding whether call unpersist on a rdd is expensive? what is the best solution to uncache the cached rdd? Thanks Edwin
Publishing a transformed DStream to Kafka
Hello all, after having applied several transformations to a DStream I'd like to publish all the elements in all the resulting RDDs to Kafka. What the best way to do that would be? Just using DStream.foreach and then RDD.foreach ? Is there any other built in utility for this use case? Thanks a lot, Max -- Massimiliano Tomassi e-mail: max.toma...@gmail.com
Re: Publishing a transformed DStream to Kafka
I'd be interested in finding the answer too. Right now, I do: val kafkaOutMsgs = kafkInMessages.map(x=myFunc(x._2,someParam)) kafkaOutMsgs.foreachRDD((rdd,time) = { rdd.foreach(rec = { writer.output(rec) }) } ) //where writer.ouput is a method that takes a string and writer is an instance of a producer class. On Tue, Sep 2, 2014 at 10:12 AM, Massimiliano Tomassi max.toma...@gmail.com wrote: Hello all, after having applied several transformations to a DStream I'd like to publish all the elements in all the resulting RDDs to Kafka. What the best way to do that would be? Just using DStream.foreach and then RDD.foreach ? Is there any other built in utility for this use case? Thanks a lot, Max -- Massimiliano Tomassi e-mail: max.toma...@gmail.com
pyspark and cassandra
Hi All , Is it possible to have cassandra as input data for PySpark. I found example for java - http://java.dzone.com/articles/sparkcassandra-stack-perform?page=0,0 and I am looking something similar for python. Thanks Oleg.
mllib performance on cluster
Hi, I evaluated the runtime performance of some of the MLlib classification algorithms on a local machine and a cluster with 10 nodes. I used standalone mode and Spark 1.0.1 in both cases. Here are the results for the total runtime: Local Cluster Logistic regression 138 sec 336 sec SVM 138 sec 336 sec Decision tree 50 sec 132 sec My dataset is quite small and my programs are very similar to the mllib examples that are included in the Spark distribution. Why is the runtime on the cluster significantly higher (almost 3 times) than that on the local machine even though the former uses more memory and more nodes? Is it because of the communication overhead on the cluster? I would like to know if there is something I need to be doing to optimize the performance on the cluster or if others have also been getting similar results. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serialized 3rd party libs
Sean, Thanks for point this out. I’d have to experiment with the mapPartitions method, but you’re right, this seems to address this issue directly. I’m also connecting to Zookeeper to retrieve SparkConf parameters. I run into the same issue with my Zookeeper driver, however, this is before any Spark contexts are created, and I assume before the job is partitioned. mn On Sep 2, 2014, at 11:00 AM, Sean Owen so...@cloudera.com wrote: The problem is not using the drivers per se, but writing your functions in a way that you are trying to serialize them. You can't serialize them, and indeed don't want to. Instead your code needs to reopen connections and so forth when the function is instantiated on the remote worker. static variables are a crude way to do that, probably too crude in general. No, there's certainly no reason you can't access these things in Spark. Since it answers exactly this point, I don't mind promoting today's blog post: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/ ... which repeats Tobias's good formulation of how to deal with things like drivers in an efficient way that doesn't trip over serialization. On Tue, Sep 2, 2014 at 5:45 PM, Matt Narrell matt.narr...@gmail.com wrote: Hello, I’m using Spark streaming to aggregate data from a Kafka topic in sliding windows. Usually we want to persist this aggregated data to a MongoDB cluster, or republish to a different Kafka topic. When I include these 3rd party drivers, I usually get a NotSerializableException due to the parallelization of the job. To side step this, I’ve used static class variables which seem to help, e.g., I can run my jobs. Is this the proper way to provide 3rd party libs to Spark jobs? Does having these drivers declared as static prohibit me from parallelizing my job? Is this even a proper way to design jobs? An alternative (I assume) would be to aggregate my data into HDFS and have another process (perhaps non-Spark?) consume it and republish/persist? Thanks, Matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: mllib performance on cluster
How many iterations are you running? Can you provide the exact details about the size of the dataset? (how many data points, how many features) Is this sparse or dense - and for the sparse case, how many non-zeroes? How many partitions is your data RDD? For very small datasets the scheduling overheads of shipping tasks across the cluster and delays due to stragglers can dominate the time actually doing your parallel computation. If you have too few partitions, you won't be taking advantage of cluster parallelism, and if you have too many you're introducing even more of the aforementioned overheads. On Tue, Sep 2, 2014 at 11:24 AM, SK skrishna...@gmail.com wrote: Hi, I evaluated the runtime performance of some of the MLlib classification algorithms on a local machine and a cluster with 10 nodes. I used standalone mode and Spark 1.0.1 in both cases. Here are the results for the total runtime: Local Cluster Logistic regression 138 sec 336 sec SVM 138 sec 336 sec Decision tree 50 sec 132 sec My dataset is quite small and my programs are very similar to the mllib examples that are included in the Spark distribution. Why is the runtime on the cluster significantly higher (almost 3 times) than that on the local machine even though the former uses more memory and more nodes? Is it because of the communication overhead on the cluster? I would like to know if there is something I need to be doing to optimize the performance on the cluster or if others have also been getting similar results. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Mesos: Pyspark python libraries
PYSPARK_PYTHON may work for you, it's used to specify which Python interpreter should be used in both driver and worker. For example, if anaconda was installed as /anaconda on all the machines, then you can specify PYSPARK_PYTHON=/anaconda/bin/python to use anaconda virtual environment in PySpark. PYSPARK_PYTHON=/anaconda/bin/python spark-submit .py Or if you want to use it by default, you can put this environment in somewhere: export PYSPARK_PYTHON=/anaconda/bin/python On Tue, Sep 2, 2014 at 9:31 AM, Daniel Rodriguez df.rodriguez...@gmail.com wrote: Hi all, I am getting started with spark and mesos, I already have spark running on a mesos cluster and I am able to start the scala spark and pyspark shells, yay!. I still have questions on how to distribute 3rd party python libraries since i want to use stuff like nltk and mlib on pyspark that requires numpy. I am using salt for the configuration management so it is really easy for me to create an anaconda virtual environment and install all the libraries there on each mesos slave. My main question is if that's the recommended way of doing it 3rd party libraries? If the answer its yes, how do i tell pyspark to use that virtual environment (and not the default python) on the spark workers? I notice that there are some addFile addPyFile functions on the SparkContext but i don't want to distribute the libraries every single time if I can just do that once by writing some salt states for that. I am specially worried about numpy and its requirements. Hopefully this makes some sense. Thanks, Daniel Rodriguez - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark and cassandra
In Spark 1.1, it is possible to read from Cassandra using Hadoop jobs. See examples/src/main/python/cassandra_inputformat.py for an example. You may need to write your own key/value converters. On Tue, Sep 2, 2014 at 11:10 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi All , Is it possible to have cassandra as input data for PySpark. I found example for java - http://java.dzone.com/articles/sparkcassandra-stack-perform?page=0,0 and I am looking something similar for python. Thanks Oleg.
MLLib decision tree: Weights
Hi everyone, We are looking to apply a weight to each training example; this weight should be used when computing the penalty of a misclassified example. For instance, without weighting, each example is penalized 1 point when evaluating the model of a classifier, such as a decision tree. We would like to customize this penalty for each training example, such that we could apply a penalty of W for a misclassified example, where W is a weight associated with the given training example. Is this something that is supported directly in MLLib? I would appreciate if someone can point me in right direction.
Re: spark-ec2 [Errno 110] Connection time out
Make sure your key pair is configured to access whatever region you're deploying to - it defaults to us-east-1, but you can provide a custom one with parameter --region. On Sat, Aug 30, 2014 at 12:53 AM, David Matheson david.j.mathe...@gmail.com wrote: I'm following the latest documentation on configuring a cluster on ec2 (http://spark.apache.org/docs/latest/ec2-scripts.html). Running ./spark-ec2 -k Blah -i .ssh/Blah.pem -s 2 launch spark-ec2-test gets a generic timeout error that's coming from File ./spark_ec2.py, line 717, in real_main conn = ec2.connect_to_region(opts.region) Any suggestions on how to debug the cause of the timeout? Note: I replaced the name of my keypair with Blah. Thanks, David -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-Errno-110-Connection-time-out-tp13171.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: mllib performance on cluster
NUm Iterations: For LR and SVM, I am using the default value of 100. All the other parameters also I am using the default values. I am pretty much reusing the code from BinaryClassification.scala. For Decision Tree, I dont see any parameter for number of iterations inthe example code, so I did not specify any. I am running each algorithm on my dataset 100 times and taking the average runtime. MY dataset is very dense (hardly any zeros). The labels are 1 and 0. I did not explicity specify the number of partitions. I did not see any code for this in the MLLib examples for BinaryClassification and DecisionTree. hardware: local: intel core i7 with 12 cores and 7.8 GB of which I am allocating 4GB for the executor memory. According to the application detail stats in the spark UI, the total memory consumed is around 1.5 GB. cluster: 10 nodes with a total of 320 cores, with 16GB per node. According to the application detail stats in the spark UI, the total memory consumed is around 95.5 GB. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13299.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Creating an RDD in another RDD causes deadlock
My code seemed deadlock when I tried to do this: object MoreRdd extends Serializable { def apply(i: Int) = { val rdd2 = sc.parallelize(0 to 10) rdd2.map(j = i*10 + j).collect } } val rdd1 = sc.parallelize(0 to 10) val y = rdd1.map(i = MoreRdd(i)).collect y.toString() It never reached the last line. The code seemed deadlock somewhere since my CPU load was quite low. Is there a restriction not to create an RDD while another one is still active? Is it because one worker can only handle one task? How do I work around this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-an-RDD-in-another-RDD-causes-deadlock-tp13302.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Creating an RDD in another RDD causes deadlock
Yes, you can't use RDDs inside RDDs. But of course you can do this: val nums = (0 to 10) val y = nums.map(i = MoreRdd(i)).collect On Tue, Sep 2, 2014 at 10:14 PM, cjwang c...@cjwang.us wrote: My code seemed deadlock when I tried to do this: object MoreRdd extends Serializable { def apply(i: Int) = { val rdd2 = sc.parallelize(0 to 10) rdd2.map(j = i*10 + j).collect } } val rdd1 = sc.parallelize(0 to 10) val y = rdd1.map(i = MoreRdd(i)).collect y.toString() It never reached the last line. The code seemed deadlock somewhere since my CPU load was quite low. Is there a restriction not to create an RDD while another one is still active? Is it because one worker can only handle one task? How do I work around this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-an-RDD-in-another-RDD-causes-deadlock-tp13302.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Creating an RDD in another RDD causes deadlock
I didn't know this restriction. Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-an-RDD-in-another-RDD-causes-deadlock-tp13302p13304.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
I am looking for a Java sample of a Partitioner
Assume say JavaWord count I call the equivalent of a Mapper JavaPairRDDString, Integer ones = words.mapToPair(,,, Now right here I want to guarantee that each word starting with a particular letter is processed in a specific partition - (Don't tell me this is a dumb idea - I know that but in a Hadoop code a custom partitioner is often important and I don't want to explain the real case) I have no idea how ones would implement mapToPartition but I want emulate Hadoop with a custom partition and keySort order JavaPairRDDString, Integer counts = ones.reduceByKey(...
Spark Streaming - how to implement multiple calculation using the same data set
Hi, I am planing to use a incoming DStream and calculate different measures from the same stream. I was able to calculate the individual measures separately and know I have to merge them and spark streaming doesn't support outer join yet. handlingtimePerWorker List(workerId, hanlingTime) fileProcessedCountPerWorker (workerId, filesProcessedCount) Is there a design pattern that allows to use each RDD in the DStream and calculate the measures for the worker and save the attributes in the same object (Worker). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-how-to-implement-multiple-calculation-using-the-same-data-set-tp13306.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: mllib performance on cluster
Those are interesting numbers. You haven't mentioned the dataset size in your thread. This is a classic example of scalability and performance assuming your baseline numbers are correct and you tuned correctly everything on your cluster. Putting my outside cap, there are multiple reasons for this, we need to look at all these parameters: 1. This could be an algorithm cost when we move to cluster 2. This could a scalability cost 3. Cluster not tuned well 4. Indeed, there is a problem/performance regression in the framework. On Tue, Sep 2, 2014 at 1:12 PM, SK skrishna...@gmail.com wrote: NUm Iterations: For LR and SVM, I am using the default value of 100. All the other parameters also I am using the default values. I am pretty much reusing the code from BinaryClassification.scala. For Decision Tree, I dont see any parameter for number of iterations inthe example code, so I did not specify any. I am running each algorithm on my dataset 100 times and taking the average runtime. MY dataset is very dense (hardly any zeros). The labels are 1 and 0. I did not explicity specify the number of partitions. I did not see any code for this in the MLLib examples for BinaryClassification and DecisionTree. hardware: local: intel core i7 with 12 cores and 7.8 GB of which I am allocating 4GB for the executor memory. According to the application detail stats in the spark UI, the total memory consumed is around 1.5 GB. cluster: 10 nodes with a total of 320 cores, with 16GB per node. According to the application detail stats in the spark UI, the total memory consumed is around 95.5 GB. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13299.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on YARN question
Hello friends: I have a follow-up to Andrew's well articulated answer below (thank you for that). (1) I've seen both of these invocations in various places: (a) '--master yarn' (b) '--master yarn-client' the latter of which doesn't appear in '/pyspark//|//spark-submit|spark-shell --help/' output. Is case (a) meant for cluster-mode apps (where the driver is out on a YARN ApplicationMaster, and case (b) for client-mode apps needing client interaction locally? Also (related), is case (b) simply shorthand for the following invocation syntax? '--master yarn --deploy-mode client' (2) Seeking clarification on the first sentence below... /Note: To avoid a copy of the Assembly JAR every time I launch a job, I place it (the lat//est// //version) at a specific (but otherwise arbitrary) location on HDFS, and then set SPARK_JAR, like so (//where you can thankfully use wild-cards//)//:// // // export SPARK_JAR=hdfs://namenode:8020///path/to///spark-assembly-*.jar/ But my question here is, when specifying additional JARS like this '--jars /path/to/jar1,/path/to/jar2,...' to /pyspark|spark-submit|spark-shell/ commands, are those JARS expected to *already* be at those path locations on both the _submitter_ server, as well as on YARN _worker_ servers? In other words, the '--jars' option won't cause the command to look for them locally at those path locations, and then ship place them to the same path-locations remotely? They need to be there already, both locally and remotely. Correct? Thank you. :) didata On 09/02/2014 12:05 PM, Andrew Or wrote: Hi Greg, You should not need to even manually install Spark on each of the worker nodes or put it into HDFS yourself. Spark on Yarn will ship all necessary jars (i.e. the assembly + additional jars) to each of the containers for you. You can specify additional jars that your application depends on through the --jars argument if you are using spark-submit / spark-shell / pyspark. As for environment variables, you can specify SPARK_YARN_USER_ENV on the driver node (where your application is submitted) to specify environment variables to be observed by your executors. If you are using the spark-submit / spark-shell / pyspark scripts, then you can set Spark properties in the conf/spark-defaults.conf properties file, and these will be propagated to the executors. In other words, configurations on the slave nodes don't do anything. For example, $ vim conf/spark-defaults.conf // set a few properties $ export SPARK_YARN_USER_ENV=YARN_LOCAL_DIR=/mnt,/mnt2 $ bin/spark-shell --master yarn --jars /local/path/to/my/jar1,/another/jar2 Best, -Andrew
Re: flattening a list in spark sql
Check out LATERAL VIEW explode: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView On Tue, Sep 2, 2014 at 1:26 PM, gtinside gtins...@gmail.com wrote: Hi , I am using jsonRDD in spark sql and having trouble iterating through array inside the json object. Please refer to the schema below : -- Preferences: struct (nullable = true) ||-- destinations: array (nullable = true) |-- user: string (nullable = true) Sample Data: -- Preferences: struct (nullable = true) ||-- destinations: (Paris,NYC,LA,EWR) |-- user: test1 -- Preferences: struct (nullable = true) ||-- destinations: (Paris,SFO) |-- user: test2 My requirement is to run query for displaying number of user per destination as follows : Number of users:10, Destination:Paris Number of users:20, Destination:NYC Number of users:30, Destination:SFO To achieve the above mentioned result, I need to flatten out the destinations array, but I am not sure how to do it. Can you please help ? Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flattening-a-list-in-spark-sql-tp13300.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on YARN question
Hi Didata, (1) Correct. The default deploy mode is `client`, so both masters `yarn` and `yarn-client` run Spark in client mode. If you explicitly specify master as `yarn-cluster`, Spark will run in cluster mode. If you implicitly specify one deploy mode through the master (e.g. yarn-client) but set deploy mode to the opposite (e.g. cluster), Spark will complain and throw an exception. :) (2) The jars passed through the `--jars` option only need to be visible to the spark-submit program. Depending on the deploy mode, they will be propagated to the containers (i.e. the executors, and the driver in cluster mode) differently so you don't need to manually copy them yourself, either through rsync'ing or uploading to HDFS. Another thing is that SPARK_JAR is technically deprecated (you should get a warning for using it). Instead, you can set spark.yarn.jar in your conf/spark-defaults.conf on the submitter node. Let me know if you have more questions, -Andrew 2014-09-02 15:12 GMT-07:00 Dimension Data, LLC. subscripti...@didata.us: Hello friends: I have a follow-up to Andrew's well articulated answer below (thank you for that). (1) I've seen both of these invocations in various places: (a) '--master yarn' (b) '--master yarn-client' the latter of which doesn't appear in '*pyspark**|**spark-submit|spark-shell --help*' output. Is case (a) meant for cluster-mode apps (where the driver is out on a YARN ApplicationMaster, and case (b) for client-mode apps needing client interaction locally? Also (related), is case (b) simply shorthand for the following invocation syntax? '--master yarn --deploy-mode client' (2) Seeking clarification on the first sentence below... *Note: To avoid a copy of the Assembly JAR every time I launch a job, I place it (the lat**est* *version) at a specific (but otherwise arbitrary) location on HDFS, and then set SPARK_JAR, like so (**where you can thankfully use wild-cards**)**:* * export SPARK_JAR=hdfs://namenode:8020/**path/to* */spark-assembly-*.jar* But my question here is, when specifying additional JARS like this '--jars /path/to/jar1,/path/to/jar2,...' to *pyspark|spark-submit|spark-shell* commands, are those JARS expected to *already* be at those path locations on both the _submitter_ server, as well as on YARN _worker_ servers? In other words, the '--jars' option won't cause the command to look for them locally at those path locations, and then ship place them to the same path-locations remotely? They need to be there already, both locally and remotely. Correct? Thank you. :) didata On 09/02/2014 12:05 PM, Andrew Or wrote: Hi Greg, You should not need to even manually install Spark on each of the worker nodes or put it into HDFS yourself. Spark on Yarn will ship all necessary jars (i.e. the assembly + additional jars) to each of the containers for you. You can specify additional jars that your application depends on through the --jars argument if you are using spark-submit / spark-shell / pyspark. As for environment variables, you can specify SPARK_YARN_USER_ENV on the driver node (where your application is submitted) to specify environment variables to be observed by your executors. If you are using the spark-submit / spark-shell / pyspark scripts, then you can set Spark properties in the conf/spark-defaults.conf properties file, and these will be propagated to the executors. In other words, configurations on the slave nodes don't do anything. For example, $ vim conf/spark-defaults.conf // set a few properties $ export SPARK_YARN_USER_ENV=YARN_LOCAL_DIR=/mnt,/mnt2 $ bin/spark-shell --master yarn --jars /local/path/to/my/jar1,/another/jar2 Best, -Andrew
Re: mllib performance on cluster
The dataset is quite small : 5.6 KB. It has 200 rows and 3 features, and 1 column of labels. From this dataset, I split 80% for training set and 20% for test set. The features are integer counts and labels are binary (1/0). thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13311.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: [Streaming] Akka-based receiver with messages defined in uploaded jar
It works with spark.executor.extraClassPath – no exceptions in this case and I’m getting expected results. But to me it limits/complicates usage Akka based receivers a lot. Do you think it should be considered as a bug? Even if it’s not, can it be fixed/worked around by some classloading magic at either Spark or application code? From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: Friday, August 29, 2014 7:21 PM To: Anton Brazhnyk Cc: user@spark.apache.org Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded jar Can you try adding the JAR to the class path of the executors directly, by setting the config spark.executor.extraClassPath in the SparkConf. See Configuration page - http://spark.apache.org/docs/latest/configuration.html#runtime-environment I think what you guessed is correct. The Akka actor system is not aware of the classes that are dynamically added when the custom jar is added with setJar. TD On Fri, Aug 29, 2014 at 6:44 PM, Anton Brazhnyk anton.brazh...@genesys.commailto:anton.brazh...@genesys.com wrote: Just checked it with 1.0.2 Still same exception. From: Anton Brazhnyk [mailto:anton.brazh...@genesys.commailto:anton.brazh...@genesys.com] Sent: Wednesday, August 27, 2014 6:46 PM To: Tathagata Das Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: [Streaming] Akka-based receiver with messages defined in uploaded jar Sorry for the delay with answer – was on vacation. As I said I was using modified version of launcher from the example. Modification is just about setting spark master URL in the code to not use run-example script. The launcher itself was in the attached zip (attaching it once more) as ActorWordCount object. From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: Tuesday, August 05, 2014 11:32 PM To: Anton Brazhnyk Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded jar How are you launching/submitting the program? Using spark-submit? Or some other script (can you provide that)? TD On Tue, Aug 5, 2014 at 6:52 PM, Anton Brazhnyk anton.brazh...@genesys.commailto:anton.brazh...@genesys.com wrote: Went through it once again to leave the only modification in question. Still same exception. I hope sources as zip file (instead of github) still can be tolerated. :) Here is the stacktrace generated with this sources: 14/08/05 18:45:54 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1407289554800 14/08/05 18:45:54 ERROR Remoting: org.apache.spark.examples.streaming.CustomMessage java.lang.ClassNotFoundException: org.apache.spark.examples.streaming.CustomMessage at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623) at akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at
Re: mllib performance on cluster
Hmm... something is fishy here. That's a *really* small dataset for a spark job, so almost all your time will be spent in these overheads, but still you should be able to train a logistic regression model with the default options and 100 iterations in 1s on a single machine. Are you caching your dataset before training the classifier on it? It's possible that you're rereading it from disk (or across the internet, maybe) on every iteration? From spark-shell: import org.apache.spark.mllib.util.LogisticRegressionDataGenerator val dat = LogisticRegressionDataGenerator.generateLogisticRDD(sc, 200, 3, 1e-4, 4, 0.2).cache() println(dat.count()) //should give 200 import org.apache.spark.mllib.classification.LogisticRegressionWithSGD val start = System.currentTimeMillis; val model = LogisticRegressionWithSGD.train(dat, 100); val delta = System.currentTimeMillis - start; println(delta) //On my laptop, 863ms. On Tue, Sep 2, 2014 at 3:51 PM, SK skrishna...@gmail.com wrote: The dataset is quite small : 5.6 KB. It has 200 rows and 3 features, and 1 column of labels. From this dataset, I split 80% for training set and 20% for test set. The features are integer counts and labels are binary (1/0). thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13311.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
I am looking for a Java sample of a Partitioner
Assume say JavaWord count I call the equivalent of a Mapper JavaPairRDDString, Integer ones = words.mapToPair(,,, Now right here I want to guarantee that each word starting with a particular letter is processed in a specific partition - (Don't tell me this is a dumb idea - I know that but in a Hadoop code a custom partitioner is often important and I don't want to explain the real case) I have no idea how ones would implement such partitioning here or what code would look like assuming mapToPartition was used JavaPairRDDString, Integer counts = ones.reduceByKey(...
Re: [Streaming] Akka-based receiver with messages defined in uploaded jar
I am not sure if there is a quick fix for this as the actor is started in the same actorSystem as the Spark's actor system. And since that actor system is started as soon as the executor is launched, even before the application code is launched, there isnt much classloader magic that can be done. However, I think a solution could be creating a new actorSystem for this purpose. It will get created when the receiver is started, and stopped when the receiver exits. That should not be a big change, though handling the corner cases of shutting and starting actor systems (as receiver's get relaunched) configuring that actor system (as it is different from the Spark's actor system) needs to be handled carefully. Its best to consider it as a bug. Could you make a JIRA for it? And may be fix it as well ;) ? TD On Tue, Sep 2, 2014 at 3:54 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: It works with spark.executor.extraClassPath – no exceptions in this case and I’m getting expected results. But to me it limits/complicates usage Akka based receivers a lot. Do you think it should be considered as a bug? Even if it’s not, can it be fixed/worked around by some classloading magic at either Spark or application code? *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com] *Sent:* Friday, August 29, 2014 7:21 PM *To:* Anton Brazhnyk *Cc:* user@spark.apache.org *Subject:* Re: [Streaming] Akka-based receiver with messages defined in uploaded jar Can you try adding the JAR to the class path of the executors directly, by setting the config spark.executor.extraClassPath in the SparkConf. See Configuration page - http://spark.apache.org/docs/latest/configuration.html#runtime-environment I think what you guessed is correct. The Akka actor system is not aware of the classes that are dynamically added when the custom jar is added with setJar. TD On Fri, Aug 29, 2014 at 6:44 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Just checked it with 1.0.2 Still same exception. *From:* Anton Brazhnyk [mailto:anton.brazh...@genesys.com] *Sent:* Wednesday, August 27, 2014 6:46 PM *To:* Tathagata Das *Cc:* user@spark.apache.org *Subject:* RE: [Streaming] Akka-based receiver with messages defined in uploaded jar Sorry for the delay with answer – was on vacation. As I said I was using modified version of launcher from the example. Modification is just about setting spark master URL in the code to not use run-example script. The launcher itself was in the attached zip (attaching it once more) as ActorWordCount object. *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com tathagata.das1...@gmail.com] *Sent:* Tuesday, August 05, 2014 11:32 PM *To:* Anton Brazhnyk *Cc:* user@spark.apache.org *Subject:* Re: [Streaming] Akka-based receiver with messages defined in uploaded jar How are you launching/submitting the program? Using spark-submit? Or some other script (can you provide that)? TD On Tue, Aug 5, 2014 at 6:52 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Went through it once again to leave the only modification in question. Still same exception. I hope sources as zip file (instead of github) still can be tolerated. :) Here is the stacktrace generated with this sources: 14/08/05 18:45:54 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1407289554800 14/08/05 18:45:54 ERROR Remoting: org.apache.spark.examples.streaming.CustomMessage java.lang.ClassNotFoundException: org.apache.spark.examples.streaming.CustomMessage at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623) at akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at
Re: [PySpark] large # of partitions causes OOM
On 08/29/2014 06:05 PM, Nick Chammas wrote: Here’s a repro for PySpark: |a = sc.parallelize([Nick,John,Bob]) a = a.repartition(24000) a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1) | When I try this on an EC2 cluster with 1.1.0-rc2 and Python 2.7, this is what I get: |a = sc.parallelize([Nick,John,Bob]) a = a.repartition(24000) a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1) 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(0, ip-10-138-29-167.ec2.internal,46252,0)with no recent heart beats:175143ms exceeds45000ms 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(10, ip-10-138-18-106.ec2.internal,33711,0)with no recent heart beats:175359ms exceeds45000ms 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(19, ip-10-139-36-207.ec2.internal,52208,0)with no recent heart beats:173061ms exceeds45000ms 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(5, ip-10-73-142-70.ec2.internal,56162,0)with no recent heart beats:176816ms exceeds45000ms 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(7, ip-10-236-145-200.ec2.internal,40959,0)with no recent heart beats:182241ms exceeds45000ms 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(4, ip-10-139-1-195.ec2.internal,49221,0)with no recent heart beats:178406ms exceeds45000ms 14/08/29 21:54:41 ERROR Utils: Uncaught exceptionin thread Result resolver thread-3 java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) atorg.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162) atorg.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) atorg.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514) atorg.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355) atorg.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68) atorg.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) atorg.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) atorg.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) atorg.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exceptionin threadResult resolver thread-3 14/08/29 21:56:26 ERROR SendingConnection: Exceptionwhile reading SendingConnection to ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014) java.nio.channels.ClosedChannelException at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295) atorg.apache.spark.network.SendingConnection.read(Connection.scala:390) atorg.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
Re: Possible to make one executor be able to work on multiple tasks simultaneously?
I'm pretty sure the issue was an interaction with another subsystem. Thanks for your patience with me! On Tue, Sep 2, 2014 at 10:05 AM, Sean Owen so...@cloudera.com wrote: +user@ An executor is specific to an application, but an application can be executing many jobs at once. So as I understand many jobs' tasks can be executing at once on an executor. You may not use your full 80-way parallelism if, for example, your data set doesn't have 80 partitions. I also believe Spark will not necessarily spread the load over executors, instead preferring to respect data and rack locality if possible. Those are two reasons you might see only 4 executors active. If you mean only 4 executors exist at all, is it possible the other 4 can't provide the memory you're asking for? On Tue, Sep 2, 2014 at 5:56 PM, Victor Tso-Guillen v...@paxata.com wrote: Actually one more question, since in preliminary runs I wasn't sure if I understood what's going on. Are the cores allocated to an executor able to execute tasks for different jobs simultaneously, or just for one job at a time? I have 10 workers with 8 cores each, and it appeared that one job got four executors at once, then four more later on. The system wasn't anywhere near saturation of 80 cores so I would've expected all 8 cores to be running simultaneously. If there's value to these questions, please reply back to the list. On Tue, Sep 2, 2014 at 6:58 AM, Victor Tso-Guillen v...@paxata.com wrote: Thank you for the help, guys. So as I expected, I didn't fully understand the options. I had SPARK_WORKER_CORES set to 1 because I did not realize that by setting to 1 it would mean an executor could operate on multiple tasks simultaneously. I just thought it was a hint to Spark that that executor could be expected to use that many threads, but otherwise I had not understood that it affected the scheduler that way. Thanks! On Sun, Aug 31, 2014 at 9:28 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Victor, As Sean said, executors actually execute multiple tasks at a time. The only reasons they wouldn't are either (1) if you launched an executor with just 1 core (you can configure how many cores the executors will use when you set up your Worker, or it will look at your system by default) or (2) if your tasks are acquiring some kind of global lock, so only one can run at a time. To test this, do the following: - Launch your standalone cluster (you can do it on just one machine by adding just localhost in the slaves file) - Go to http://:4040 and look at the worker list. Do you see workers with more than 1 core? If not, you need to launch the workers by hand or set SPARK_WORKER_CORES in conf/spark-env.sh. - Run your application. Make sure it has enough pending tasks for your cores in the driver web UI (http://:4040), and if so, jstack one of the CoarseGrainedExecutor processes on a worker to see what the threads are doing. (Look for threads that contain TaskRunner.run in them) You can also try a simple CPU-bound job that launches lots of tasks like this to see that all cores are being used: sc.parallelize(1 to 1000, 1000).map(_ = (1 to 20).product).count() Each task here takes 1-2 seconds to execute and there are 1000 of them so it should fill up your cluster. Matei On August 31, 2014 at 9:18:02 PM, Victor Tso-Guillen (v...@paxata.com(mailto:v...@paxata.com)) wrote: I'm pretty sure my terminology matches that doc except the doc makes no explicit mention of machines. In standalone mode, you can spawn multiple workers on a single machine and each will babysit one executor (per application). In my observation as well each executor can be assigned many tasks but operates on one at a time. If there's a way to have it execute in multiple tasks simultaneously in a single VM can you please show me how? Maybe I'm missing the requisite configuration options, no matter how common or trivial... On Sunday, August 31, 2014, Sean Owen wrote: The confusion may be your use of 'worker', which isn't matching what 'worker' means in Spark. Have a look at https://spark.apache.org/docs/latest/cluster-overview.html Of course one VM can run many tasks at once; that's already how Spark works. On Sun, Aug 31, 2014 at 4:52 AM, Victor Tso-Guillen wrote: I might not be making myself clear, so sorry about that. I understand that a machine can have as many spark workers as you'd like, for example one per core. A worker may be assigned to a pool for one or more applications, but for a single application let's just say a single worker will have at most a single executor. An executor can be assigned multiple tasks in its queue, but will work on one task at a time only. In local mode, you can specify the number of
Re: zip equal-length but unequally-partition
On 09/01/2014 11:39 PM, Kevin Jung wrote: http://www.adamcrume.com/blog/archive/2014/02/19/fixing-sparks-rdd-zip http://www.adamcrume.com/blog/archive/2014/02/19/fixing-sparks-rdd-zip Please check this url . I got same problem in v1.0.1 In some cases, RDD losts several elements after zip so that a total count of ZippedRDD is less than source RDD. will 1.1 version of Spark fix it? will you file a jira for the issue? if you do it can be verified and tracked as fixed / not fixed in future releases. best, matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-shell return results when the job is executing?
what do you think about using a streamrdd in this case? assuming streaming is available for pyspark, and you can collect based on # events best, matt On 09/02/2014 10:38 AM, Andrew Or wrote: Spark-shell, or any other Spark application, returns the full results of the job until it has finished executing. You could add a hook for it to write partial results to a file, but you may want to do so sparingly to incur fewer I/Os. If you have a large file and the result contains many lines, it is unlikely to fully fit in memory anyway, so it's probably not a bad idea to just write your results to a file in batches while the application is still running. -Andrew 2014-09-01 22:16 GMT-07:00 Hao Wang wh.s...@gmail.com mailto:wh.s...@gmail.com: Hi, all I am wondering if I use Spark-shell to scan a large file to obtain lines containing error, whether the shell returns results while the job is executing, or the job has been totally finished. Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com mailto:wh.s...@gmail.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: flattening a list in spark sql
Thanks . I am not using hive context . I am loading data from Cassandra and then converting it into json and then querying it through SQL context . Can I use use hive context to query on a jsonRDD ? Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flattening-a-list-in-spark-sql-tp13300p13320.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming - how to implement multiple calculation using the same data set
Hi, On Wed, Sep 3, 2014 at 6:54 AM, salemi alireza.sal...@udo.edu wrote: I was able to calculate the individual measures separately and know I have to merge them and spark streaming doesn't support outer join yet. Can't you assign some dummy key (e.g., index) before your processing and then join on that key using a function from http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions ? Tobias
What is the appropriate privileges needed for writting files into checkpoint directory?
I tried to run KafkaWordCount in a Spark standalone cluster. In this application, the checkpoint directory was set as follows : val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint(checkpoint) After submitting my application into the cluster, I could see the correct counting results on the console, but the running application kept complaining the following: 14/09/03 10:01:22 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException java.io.FileNotFoundException: /usr/games/SparkStreaming/checkpoint/a03505c8-0183-4bc0-b674-bf0e16767564/rdd-96/.part-0-attempt-171 (Permission denied) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:194) at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.init(RawLocalFileSystem.java:206) at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.init(RawLocalFileSystem.java:202) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:265) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.init(ChecksumFileSystem.java:384) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:849) at org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:103) at org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96) at org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662) On the node where I submitted the applicaition, the checkpoint directory( /usr/games/SparkStreaming/checkpoint) was created and some files was created there, but there existed no such directory on other nodes of the Spark cluster. I guess that was because processes on other nodes of the cluster didn't have appropriate privileges to create the checkpoint directory. So I created that directory on each node manually and changed its mode to 777, which means any user can write to that directory. But the SparkStreaming application still kept throwing that exception. So what is the real reason? Thanks.
Re: Spark Streaming - how to implement multiple calculation using the same data set
Tobias, That was what I was planing to do and technical lead is the opinion that we should some how process a message only once and calculate all the measures for the worker. I was wondering if there is a solution out there for that? Thanks, Ali Hi, On Wed, Sep 3, 2014 at 6:54 AM, salemi alireza.sal...@udo.edu wrote: I was able to calculate the individual measures separately and know I have to merge them and spark streaming doesn't support outer join yet. Can't you assign some dummy key (e.g., index) before your processing and then join on that key using a function from http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions ? Tobias - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: flattening a list in spark sql
Yes you can. HiveContext's functionality is a strict superset of SQLContext. On Tue, Sep 2, 2014 at 6:35 PM, gtinside gtins...@gmail.com wrote: Thanks . I am not using hive context . I am loading data from Cassandra and then converting it into json and then querying it through SQL context . Can I use use hive context to query on a jsonRDD ? Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flattening-a-list-in-spark-sql-tp13300p13320.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark yarn got exception
Hi Andrew. what should I do to set master on yarn, can you please pointing me on command or documentation how to do it? I am doing the following: executed start-all.sh [root@HDOP-B sbin]# ./start-all.sh starting org.apache.spark.deploy.master.Master, logging to /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-HDOP-B.AGT.out localhost: Warning: Permanently added 'localhost' (RSA) to the list of known hosts. localhost: starting org.apache.spark.deploy.worker.Worker, logging to /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/sbin/../logs/spark-root-org.apache.spark.deploy.worker.Worker-1-HDOP-B.AGT.out after execute the command: ./bin/spark-submit --master spark://HDOP-B.AGT:7077 examples/src/main/python/pi.py 1000 the result is the following: /usr/jdk64/jdk1.7.0_45/bin/java ::/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/conf:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m 14/09/03 12:10:06 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/09/03 12:10:06 INFO SecurityManager: Changing view acls to: root 14/09/03 12:10:06 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/03 12:10:07 INFO Slf4jLogger: Slf4jLogger started 14/09/03 12:10:07 INFO Remoting: Starting remoting 14/09/03 12:10:07 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@hdop-b.agt:38944] 14/09/03 12:10:07 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@hdop-b.agt:38944] 14/09/03 12:10:07 INFO SparkEnv: Registering MapOutputTracker 14/09/03 12:10:07 INFO SparkEnv: Registering BlockManagerMaster 14/09/03 12:10:08 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140903121008-cf09 14/09/03 12:10:08 INFO MemoryStore: MemoryStore started with capacity 294.9 MB. 14/09/03 12:10:08 INFO ConnectionManager: Bound socket to port 45041 with id = ConnectionManagerId(HDOP-B.AGT,45041) 14/09/03 12:10:08 INFO BlockManagerMaster: Trying to register BlockManager 14/09/03 12:10:08 INFO BlockManagerInfo: Registering block manager HDOP-B.AGT:45041 with 294.9 MB RAM 14/09/03 12:10:08 INFO BlockManagerMaster: Registered BlockManager 14/09/03 12:10:08 INFO HttpServer: Starting HTTP Server 14/09/03 12:10:08 INFO HttpBroadcast: Broadcast server started at http://10.193.1.76:59336 14/09/03 12:10:08 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7bf5c3c3-1c02-41e8-9fb0-983e175dd45c 14/09/03 12:10:08 INFO HttpServer: Starting HTTP Server 14/09/03 12:10:08 INFO SparkUI: Started SparkUI at http://HDOP-B.AGT:4040 14/09/03 12:10:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/03 12:10:09 INFO Utils: Copying /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/examples/src/main/python/pi.py to /tmp/spark-4e252376-70cb-4171-bf2c-d804524e816c/pi.py 14/09/03 12:10:09 INFO SparkContext: Added file file:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/examples/src/main/python/pi.py at http://10.193.1.76:45893/files/pi.py with timestamp 1409717409277 14/09/03 12:10:09 INFO AppClient$ClientActor: Connecting to master spark://HDOP-B.AGT:7077... 14/09/03 12:10:09 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140903121009- 14/09/03 12:10:09 INFO AppClient$ClientActor: Executor added: app-20140903121009-/0 on worker-20140903120712-HDOP-B.AGT-51161 (HDOP-B.AGT:51161) with 8 cores 14/09/03 12:10:09 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140903121009-/0 on hostPort HDOP-B.AGT:51161 with 8 cores, 512.0 MB RAM 14/09/03 12:10:09 INFO AppClient$ClientActor: Executor updated: app-20140903121009-/0 is now RUNNING 14/09/03 12:10:12 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@hdop-b.agt:38143/user/Executor#1295757828] with ID 0 14/09/03 12:10:12 INFO BlockManagerInfo: Registering block manager HDOP-B.AGT:38670 with 294.9 MB RAM Traceback (most recent call last): File /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/examples/src/main/python/pi.py, line 38, in module count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add) File /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/python/pyspark/context.py, line 271, in parallelize jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices) File /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling
Number of elements in ArrayBuffer
Hi, I have the following ArrayBuffer: *ArrayBuffer(5,3,1,4)* Now, I want to get the number of elements in this ArrayBuffer and also the first element of the ArrayBuffer. I used .length and .size but they are returning 1 instead of 4. I also used .head and .last for getting the first and the last element but they also return the entire ArrayBuffer (ArrayBuffer(5,3,1,4)) What I understand from this is that, the entire ArrayBuffer is stored as one element. How should I go about doing the required things? Thank You
Re: pyspark yarn got exception
Hi , I change my command to : ./bin/spark-submit --master spark://HDOP-B.AGT:7077 --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 examples/src/main/python/pi.py 1000 and it fixed the problem. I still have couple of questions: PROCESS_LOCAL is not Yarn execution , right? how should I configure the running on yarn? Should I exeture start-all script on all machine or only one? Where is the UI / LOGS of spark execution? 152152SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:140.2 s00SUCCESS PROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:090.9 s39 ms22SUCCESSPROCESS_LOCAL HDOP-B.AGT2014/09/03 12:35:090.9 s39 ms33SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:090.9 s39 ms1 ms44SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:090.8 s39 ms2 ms55SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:090.8 s39 ms1 ms6 6SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:090.8 s1 ms77SUCCESS PROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:090.9 s88SUCCESSPROCESS_LOCAL HDOP-B.AGT2014/09/03 12:35:100.3 s99SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:100.4 s1010SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:100.3 s1 ms SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:100.3 s On Wed, Sep 3, 2014 at 12:19 PM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi Andrew. what should I do to set master on yarn, can you please pointing me on command or documentation how to do it? I am doing the following: executed start-all.sh [root@HDOP-B sbin]# ./start-all.sh starting org.apache.spark.deploy.master.Master, logging to /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-HDOP-B.AGT.out localhost: Warning: Permanently added 'localhost' (RSA) to the list of known hosts. localhost: starting org.apache.spark.deploy.worker.Worker, logging to /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/sbin/../logs/spark-root-org.apache.spark.deploy.worker.Worker-1-HDOP-B.AGT.out after execute the command: ./bin/spark-submit --master spark://HDOP-B.AGT:7077 examples/src/main/python/pi.py 1000 the result is the following: /usr/jdk64/jdk1.7.0_45/bin/java ::/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/conf:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m 14/09/03 12:10:06 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/09/03 12:10:06 INFO SecurityManager: Changing view acls to: root 14/09/03 12:10:06 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/03 12:10:07 INFO Slf4jLogger: Slf4jLogger started 14/09/03 12:10:07 INFO Remoting: Starting remoting 14/09/03 12:10:07 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@hdop-b.agt:38944] 14/09/03 12:10:07 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@hdop-b.agt:38944] 14/09/03 12:10:07 INFO SparkEnv: Registering MapOutputTracker 14/09/03 12:10:07 INFO SparkEnv: Registering BlockManagerMaster 14/09/03 12:10:08 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140903121008-cf09 14/09/03 12:10:08 INFO MemoryStore: MemoryStore started with capacity 294.9 MB. 14/09/03 12:10:08 INFO ConnectionManager: Bound socket to port 45041 with id = ConnectionManagerId(HDOP-B.AGT,45041) 14/09/03 12:10:08 INFO BlockManagerMaster: Trying to register BlockManager 14/09/03 12:10:08 INFO BlockManagerInfo: Registering block manager HDOP-B.AGT:45041 with 294.9 MB RAM 14/09/03 12:10:08 INFO BlockManagerMaster: Registered BlockManager 14/09/03 12:10:08 INFO HttpServer: Starting HTTP Server 14/09/03 12:10:08 INFO HttpBroadcast: Broadcast server started at http://10.193.1.76:59336 14/09/03 12:10:08 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7bf5c3c3-1c02-41e8-9fb0-983e175dd45c 14/09/03 12:10:08 INFO HttpServer: Starting HTTP Server 14/09/03 12:10:08 INFO SparkUI: Started SparkUI at http://HDOP-B.AGT:4040 14/09/03 12:10:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/03 12:10:09 INFO Utils: Copying /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/examples/src/main/python/pi.py to /tmp/spark-4e252376-70cb-4171-bf2c-d804524e816c/pi.py 14/09/03 12:10:09 INFO SparkContext: Added file file:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/examples/src/main/python/pi.py at http://10.193.1.76:45893/files/pi.py with timestamp 1409717409277 14/09/03 12:10:09 INFO AppClient$ClientActor: Connecting to master spark://HDOP-B.AGT:7077... 14/09/03 12:10:09 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140903121009- 14/09/03 12:10:09 INFO AppClient$ClientActor: Executor added: app-20140903121009-/0 on
Re: Number of elements in ArrayBuffer
Hi Deep, Please find below results of ArrayBuffer in scala REPL scala import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer scala val a = ArrayBuffer(5,3,1,4) a: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(5, 3, 1, 4) scala a.head res2: Int = 5 scala a.tail res3: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(3, 1, 4) scala a.length res4: Int = 4 Regards, Rajesh On Wed, Sep 3, 2014 at 10:13 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have the following ArrayBuffer: *ArrayBuffer(5,3,1,4)* Now, I want to get the number of elements in this ArrayBuffer and also the first element of the ArrayBuffer. I used .length and .size but they are returning 1 instead of 4. I also used .head and .last for getting the first and the last element but they also return the entire ArrayBuffer (ArrayBuffer(5,3,1,4)) What I understand from this is that, the entire ArrayBuffer is stored as one element. How should I go about doing the required things? Thank You
Re: zip equal-length but unequally-partition
I just created it. Here's ticket. https://issues.apache.org/jira/browse/SPARK-3364 Thanks, Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/zip-equal-length-but-unequally-partition-tp13246p13330.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark yarn got exception
Hi, I changed to master to point on yarn and got such exceptions: [root@HDOP-B spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563]# ./bin/spark-submit --master yarn://HDOP-M.AGT:8032 --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 examples/src/main/python/pi.py 1000 /usr/jdk64/jdk1.7.0_45/bin/java ::/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/conf:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar:/etc/hadoop/conf -XX:MaxPermSize=128m -Djava.library.path= -Xms4g -Xmx4g 14/09/03 13:33:29 INFO spark.SecurityManager: Changing view acls to: root 14/09/03 13:33:29 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/03 13:33:30 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/09/03 13:33:30 INFO Remoting: Starting remoting 14/09/03 13:33:30 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@hdop-b.agt:49765] 14/09/03 13:33:30 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@hdop-b.agt:49765] 14/09/03 13:33:31 INFO spark.SparkEnv: Registering MapOutputTracker 14/09/03 13:33:31 INFO spark.SparkEnv: Registering BlockManagerMaster 14/09/03 13:33:31 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-2014090311-205a 14/09/03 13:33:31 INFO storage.MemoryStore: MemoryStore started with capacity 2.3 GB. 14/09/03 13:33:31 INFO network.ConnectionManager: Bound socket to port 54486 with id = ConnectionManagerId(HDOP-B.AGT,54486) 14/09/03 13:33:31 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/09/03 13:33:31 INFO storage.BlockManagerInfo: Registering block manager HDOP-B.AGT:54486 with 2.3 GB RAM 14/09/03 13:33:31 INFO storage.BlockManagerMaster: Registered BlockManager 14/09/03 13:33:31 INFO spark.HttpServer: Starting HTTP Server 14/09/03 13:33:31 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/03 13:33:31 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:60199 14/09/03 13:33:31 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.193.1.76:60199 14/09/03 13:33:31 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-67574874-3b14-4c8d-b075-580061d140e0 14/09/03 13:33:31 INFO spark.HttpServer: Starting HTTP Server 14/09/03 13:33:31 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/03 13:33:31 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:45848 14/09/03 13:33:31 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/03 13:33:31 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/09/03 13:33:31 INFO ui.SparkUI: Started SparkUI at http://HDOP-B.AGT:4040 14/09/03 13:33:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable --args is deprecated. Use --arg instead. 14/09/03 13:33:32 INFO client.RMProxy: Connecting to ResourceManager at HDOP-N1.AGT/10.193.1.72:8050 14/09/03 13:33:33 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 6 14/09/03 13:33:33 INFO yarn.Client: Queue info ... queueName: default, queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/09/03 13:33:33 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 13824 14/09/03 13:33:33 INFO yarn.Client: Preparing Local resources 14/09/03 13:33:33 INFO yarn.Client: Uploading file:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar to hdfs://HDOP-B.AGT:8020/user/root/.sparkStaging/application_1409559972905_0032/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar 14/09/03 13:33:35 INFO yarn.Client: Uploading file:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/examples/src/main/python/pi.py to hdfs://HDOP-B.AGT:8020/user/root/.sparkStaging/application_1409559972905_0032/pi.py 14/09/03 13:33:35 INFO yarn.Client: Setting up the launch environment 14/09/03 13:33:35 INFO yarn.Client: Setting up container launch context 14/09/03 13:33:35 INFO yarn.Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx4096m, -Djava.io.tmpdir=$PWD/tmp, -Dspark.tachyonStore.folderName=\spark-52943421-004b-4ae7-990f-d8591a830ef8\, -Dspark.executor.memory=\2g\, -Dspark.executor.instances=\3\, -Dspark.yarn.dist.files=\file:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/examples/src/main/python/pi.py\, -Dspark.yarn.secondary.jars=\\, -Dspark.submit.pyFiles=\\, -Dspark.driver.host=\HDOP-B.AGT\, -Dspark.app.name=\PythonPi\, -Dspark.fileserver.uri=\http://10.193.1.76:45848\;, -Dspark.master=\yarn-client\, -Dspark.driver.port=\49765\, -Dspark.executor.cores=\1\, -Dspark.httpBroadcast.uri=\ http://10.193.1.76:60199\;, -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.deploy.yarn.ExecutorLauncher, --class, notused,
Re: pyspark yarn got exception
Hi Oleg. To run on YARN, simply set master to yarn. The YARN configuration, located in a yarn-site.xml, determines where to look for the YARN ResourceManager. PROCESS_LOCAL is orthogonal to the choice of cluster resource manager. A task is considered PROCESS_LOCAL when the executor it's running in happens to have the data it's processing cached. If you're looking to get familiar with the kind of confusing web of terminology, this blog post might be helpful: http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/ -Sandy On Tue, Sep 2, 2014 at 9:51 PM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I change my command to : ./bin/spark-submit --master spark://HDOP-B.AGT:7077 --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 examples/src/main/python/pi.py 1000 and it fixed the problem. I still have couple of questions: PROCESS_LOCAL is not Yarn execution , right? how should I configure the running on yarn? Should I exeture start-all script on all machine or only one? Where is the UI / LOGS of spark execution? 152 152 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:14 0.2 s 0 0 SUCCESSPROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:09 0.9 s 39 ms 2 2 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:09 0.9 s 39 ms 3 3 SUCCESSPROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:09 0.9 s 39 ms1 ms 4 4 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:09 0.8 s 39 ms 2 ms 5 5 SUCCESSPROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:09 0.8 s 39 ms1 ms 6 6 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:09 0.8 s 1 ms 7 7 SUCCESSPROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:09 0.9 s 8 8 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:10 0.3 s 9 9 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:10 0.4 s 10 10 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:10 0.3 s 1 ms 11 11 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:10 0.3 s On Wed, Sep 3, 2014 at 12:19 PM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi Andrew. what should I do to set master on yarn, can you please pointing me on command or documentation how to do it? I am doing the following: executed start-all.sh [root@HDOP-B sbin]# ./start-all.sh starting org.apache.spark.deploy.master.Master, logging to /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-HDOP-B.AGT.out localhost: Warning: Permanently added 'localhost' (RSA) to the list of known hosts. localhost: starting org.apache.spark.deploy.worker.Worker, logging to /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/sbin/../logs/spark-root-org.apache.spark.deploy.worker.Worker-1-HDOP-B.AGT.out after execute the command: ./bin/spark-submit --master spark://HDOP-B.AGT:7077 examples/src/main/python/pi.py 1000 the result is the following: /usr/jdk64/jdk1.7.0_45/bin/java ::/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/conf:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m 14/09/03 12:10:06 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/09/03 12:10:06 INFO SecurityManager: Changing view acls to: root 14/09/03 12:10:06 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/03 12:10:07 INFO Slf4jLogger: Slf4jLogger started 14/09/03 12:10:07 INFO Remoting: Starting remoting 14/09/03 12:10:07 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@hdop-b.agt:38944] 14/09/03 12:10:07 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@hdop-b.agt:38944] 14/09/03 12:10:07 INFO SparkEnv: Registering MapOutputTracker 14/09/03 12:10:07 INFO SparkEnv: Registering BlockManagerMaster 14/09/03 12:10:08 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140903121008-cf09 14/09/03 12:10:08 INFO MemoryStore: MemoryStore started with capacity 294.9 MB. 14/09/03 12:10:08 INFO ConnectionManager: Bound socket to port 45041 with id = ConnectionManagerId(HDOP-B.AGT,45041) 14/09/03 12:10:08 INFO BlockManagerMaster: Trying to register BlockManager 14/09/03 12:10:08 INFO BlockManagerInfo: Registering block manager HDOP-B.AGT:45041 with 294.9 MB RAM 14/09/03 12:10:08 INFO BlockManagerMaster: Registered BlockManager 14/09/03 12:10:08 INFO HttpServer: Starting HTTP Server 14/09/03 12:10:08 INFO HttpBroadcast: Broadcast server started at http://10.193.1.76:59336 14/09/03 12:10:08 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7bf5c3c3-1c02-41e8-9fb0-983e175dd45c 14/09/03 12:10:08 INFO HttpServer: Starting HTTP Server 14/09/03 12:10:08 INFO SparkUI: Started SparkUI at http://HDOP-B.AGT:4040 14/09/03 12:10:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your
Re: Number of elements in ArrayBuffer
I have a problem here. When I run the commands that Rajesh has suggested in Scala REPL, they work fine. But, I want to work in a Spark code, where I need to find the number of elements in an ArrayBuffer. In Spark code, these things are not working. How should I do that? On Wed, Sep 3, 2014 at 10:25 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Deep, Please find below results of ArrayBuffer in scala REPL scala import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer scala val a = ArrayBuffer(5,3,1,4) a: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(5, 3, 1, 4) scala a.head res2: Int = 5 scala a.tail res3: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(3, 1, 4) scala a.length res4: Int = 4 Regards, Rajesh On Wed, Sep 3, 2014 at 10:13 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have the following ArrayBuffer: *ArrayBuffer(5,3,1,4)* Now, I want to get the number of elements in this ArrayBuffer and also the first element of the ArrayBuffer. I used .length and .size but they are returning 1 instead of 4. I also used .head and .last for getting the first and the last element but they also return the entire ArrayBuffer (ArrayBuffer(5,3,1,4)) What I understand from this is that, the entire ArrayBuffer is stored as one element. How should I go about doing the required things? Thank You