Re: Shuffle read/write issue in spark 1.2
We tried changing the compression codec from snappy to lz4. It did improve the performance but we are still wondering why default options didn’t work as claimed. From: Raghavendra Pandey raghavendra.pan...@gmail.commailto:raghavendra.pan...@gmail.com Date: Friday, 6 February 2015 1:23 pm To: Praveen Garg praveen.g...@guavus.commailto:praveen.g...@guavus.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Shuffle read/write issue in spark 1.2 Even I observed the same issue. On Fri, Feb 6, 2015 at 12:19 AM, Praveen Garg praveen.g...@guavus.commailto:praveen.g...@guavus.com wrote: Hi, While moving from spark 1.1 to spark 1.2, we are facing an issue where Shuffle read/write has been increased significantly. We also tried running the job by rolling back to spark 1.1 configuration where we set spark.shuffle.manager to hash and spark.shuffle.blockTransferService to nio. It did improve the performance a bit but it was still much worse than spark 1.1. The scenario seems similar to the bug raised sometime back https://issues.apache.org/jira/browse/SPARK-5081. Has anyone come across any similar issue? Please tell us if any configuration change can help. Regards, Praveen
generate a random matrix with uniform distribution
Hi I would like to know how can I generate a random matrix where each element come from a uniform distribution in -1, 1 . In particular I would like the matrix be a distributed row matrix with dimension n x p Is this possible with mllib? Should I use another library? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/generate-a-random-matrix-with-uniform-distribution-tp21538.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: matrix of random variables with spark.
Forgot to add the more recent training material: https://databricks-training.s3.amazonaws.com/index.html On Fri, Feb 6, 2015 at 12:12 PM, Burak Yavuz brk...@gmail.com wrote: Hi Luca, You can tackle this using RowMatrix (spark-shell example): ``` import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.random._ // sc is the spark context, numPartitions is the number of partitions you want the RDD to be in val data: RDD[Vector] = RandomRDDs.normalVectorRDD(sc, n, k, numPartitions, seed) val matrix = new RowMatrix(data, n, k) ``` You can find more tutorials here: https://spark-summit.org/2013/exercises/index.html Best, Burak On Fri, Feb 6, 2015 at 10:03 AM, Luca Puggini lucapug...@gmail.com wrote: Hi all, this is my first email with this mailing list and I hope that I am not doing anything wrong. I am currently trying to define a distributed matrix with n rows and k columns where each element is randomly sampled by a uniform distribution. How can I do that? It would be also nice if you can suggest me any good guide that I can use to start working with Spark. (The quick start tutorial is not enough for me ) Thanks a lot !
Re: matrix of random variables with spark.
Hi Luca, You can tackle this using RowMatrix (spark-shell example): ``` import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.random._ // sc is the spark context, numPartitions is the number of partitions you want the RDD to be in val data: RDD[Vector] = RandomRDDs.normalVectorRDD(sc, n, k, numPartitions, seed) val matrix = new RowMatrix(data, n, k) ``` You can find more tutorials here: https://spark-summit.org/2013/exercises/index.html Best, Burak On Fri, Feb 6, 2015 at 10:03 AM, Luca Puggini lucapug...@gmail.com wrote: Hi all, this is my first email with this mailing list and I hope that I am not doing anything wrong. I am currently trying to define a distributed matrix with n rows and k columns where each element is randomly sampled by a uniform distribution. How can I do that? It would be also nice if you can suggest me any good guide that I can use to start working with Spark. (The quick start tutorial is not enough for me ) Thanks a lot !
Re: Get filename in Spark Streaming
Thank you Emre, This helps, i am able to get filename. But i am not sure how to fit this into Dstream RDD. val inputStream = ssc.textFileStream(/hdfs Path/) inputStream is Dstreamrdd and in foreachrdd , am doing my processing inputStream.foreachRDD(rdd = { * //how to get filename here??* }) Can you please help. On Thu, Feb 5, 2015 at 11:15 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, Did you check the following? http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/ http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-td6551.html -- Emre Sevinç On Fri, Feb 6, 2015 at 2:16 AM, Subacini B subac...@gmail.com wrote: Hi All, We have filename with timestamp say ABC_1421893256000.txt and the timestamp needs to be extracted from file name for further processing.Is there a way to get input file name picked up by spark streaming job? Thanks in advance Subacini -- Emre Sevinc
Re: How to broadcast a variable read from a file in yarn-cluster mode?
OK I tried that, but how do I convert an RDD to a Set that I can then broadcast and cache? val badIPs = sc.textFile(hdfs:///user/jon/+ badfullIPs.csv) val badIPsLines = badIPs.getLines val badIpSet = badIPsLines.toSet val badIPsBC = sc.broadcast(badIpSet) produces the error value getLines is not a member of org.apache.spark.rdd.RDD[String]. Leaving it as an RDD and then constantly joining I think will be too slow for a streaming job. On Thu, Feb 5, 2015 at 8:06 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Jon, You'll need to put the file on HDFS (or whatever distributed filesystem you're running on) and load it from there. -Sandy On Thu, Feb 5, 2015 at 3:18 PM, YaoPau jonrgr...@gmail.com wrote: I have a file badFullIPs.csv of bad IP addresses used for filtering. In yarn-client mode, I simply read it off the edge node, transform it, and then broadcast it: val badIPs = fromFile(edgeDir + badfullIPs.csv) val badIPsLines = badIPs.getLines val badIpSet = badIPsLines.toSet val badIPsBC = sc.broadcast(badIpSet) badIPs.close How can I accomplish this in yarn-cluster mode? Jon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLLib: feature standardization
Hi, I have a dataset in csv format and I am trying to standardize the features before using k-means clustering. The data does not have any labels but has the following format: s1, f12,f13,... s2, f21,f22,... where s is a string id, and f is a floating point feature value. To perform feature standardization, I need to compute the mean and variance/std deviation of the features values in each element of the RDD (i.e each row). However, the summary Statistics library in Spark MLLib provides only a colStats() method that provides column-wise mean and variance. I tried to compute the mean and variance per row, using the code below but got a compilation error that there is no mean() or variance() method for a tuple or Vector object. Is there a Spark library to compute the row-wise mean and variance for an RDD, where each row (i.e. element) of the RDD is a Vector or tuple of N feature values? thanks My code for standardization is as follows: //read the data val data=sc.textFile(file_name) .map(_.split(,)) // extract the features. For this example I am using only 2 features, but the data has more features val features = data.map(d= Vectors.dense(d(1).toDouble, d(2).toDouble)) val std_features = features.map(f= { val fmean = f.mean() // Error: NO MEAN() for a Vector or Tuple object val fstd= scala.math.sqrt(f.variance())// Error: NO variance() for a Vector or Tuple object for (i - 0 to f.length) // standardize the features { var fs = 0.0 if (fstd 0.0) fs = (f(i) - fmean)/fstd fs } } ) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-feature-standardization-tp21539.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Where can I find logs set inside RDD processing functions?
The yarn log aggregation is enabled and the logs which I get through yarn logs -applicationId your_application_id are no different than what I get through logs in Yarn Application tracking URL. They still dont have the above logs. On Fri, Feb 6, 2015 at 3:36 PM, Petar Zecevic petar.zece...@gmail.com wrote: You can enable YARN log aggregation (yarn.log-aggregation-enable to true) and execute command yarn logs -applicationId your_application_id after your application finishes. Or you can look at them directly in HDFS in /tmp/logs/user/logs/ applicationid/hostname On 6.2.2015. 19:50, nitinkak001 wrote: I am trying to debug my mapPartitionsFunction. Here is the code. There are two ways I am trying to log using log.info() or println(). I am running in yarn-cluster mode. While I can see the logs from driver code, I am not able to see logs from map, mapPartition functions in the Application Tracking URL. Where can I find the logs? /var outputRDD = partitionedRDD.mapPartitions(p = { val outputList = new ArrayList[scala.Tuple3[Long, Long, Int]] p.map({ case(key, value) = { log.info(Inside map) println(Inside map); for(i - 0 until outputTuples.size()){ val outputRecord = outputTuples.get(i) if(outputRecord != null){ outputList.add(outputRecord.getCurrRecordProfileID(), outputRecord.getWindowRecordProfileID, outputRecord.getScore()) } } } }) outputList.iterator() })/ Here is my log4j.properties /log4j.rootCategory=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO/ -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Where-can-I-find-logs-set-inside- RDD-processing-functions-tp21537.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to broadcast a variable read from a file in yarn-cluster mode?
You can call collect() to pull in the contents of an RDD into the driver: val badIPsLines = badIPs.collect() On Fri, Feb 6, 2015 at 12:19 PM, Jon Gregg jonrgr...@gmail.com wrote: OK I tried that, but how do I convert an RDD to a Set that I can then broadcast and cache? val badIPs = sc.textFile(hdfs:///user/jon/+ badfullIPs.csv) val badIPsLines = badIPs.getLines val badIpSet = badIPsLines.toSet val badIPsBC = sc.broadcast(badIpSet) produces the error value getLines is not a member of org.apache.spark.rdd.RDD[String]. Leaving it as an RDD and then constantly joining I think will be too slow for a streaming job. On Thu, Feb 5, 2015 at 8:06 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Jon, You'll need to put the file on HDFS (or whatever distributed filesystem you're running on) and load it from there. -Sandy On Thu, Feb 5, 2015 at 3:18 PM, YaoPau jonrgr...@gmail.com wrote: I have a file badFullIPs.csv of bad IP addresses used for filtering. In yarn-client mode, I simply read it off the edge node, transform it, and then broadcast it: val badIPs = fromFile(edgeDir + badfullIPs.csv) val badIPsLines = badIPs.getLines val badIpSet = badIPsLines.toSet val badIPsBC = sc.broadcast(badIpSet) badIPs.close How can I accomplish this in yarn-cluster mode? Jon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Where can I find logs set inside RDD processing functions?
To add to What Petar said, when YARN log aggregation is enabled, consider specifying yarn.nodemanager.remote-app-log-dir which is where aggregated logs are saved. Cheers On Fri, Feb 6, 2015 at 12:36 PM, Petar Zecevic petar.zece...@gmail.com wrote: You can enable YARN log aggregation (yarn.log-aggregation-enable to true) and execute command yarn logs -applicationId your_application_id after your application finishes. Or you can look at them directly in HDFS in /tmp/logs/user/logs/ applicationid/hostname On 6.2.2015. 19:50, nitinkak001 wrote: I am trying to debug my mapPartitionsFunction. Here is the code. There are two ways I am trying to log using log.info() or println(). I am running in yarn-cluster mode. While I can see the logs from driver code, I am not able to see logs from map, mapPartition functions in the Application Tracking URL. Where can I find the logs? /var outputRDD = partitionedRDD.mapPartitions(p = { val outputList = new ArrayList[scala.Tuple3[Long, Long, Int]] p.map({ case(key, value) = { log.info(Inside map) println(Inside map); for(i - 0 until outputTuples.size()){ val outputRecord = outputTuples.get(i) if(outputRecord != null){ outputList.add(outputRecord.getCurrRecordProfileID(), outputRecord.getWindowRecordProfileID, outputRecord.getScore()) } } } }) outputList.iterator() })/ Here is my log4j.properties /log4j.rootCategory=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO/ -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Where-can-I-find-logs-set-inside- RDD-processing-functions-tp21537.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Connecting Cassandra by unknow host
Thanks for the information, I have no any issue on connect my local Cassandra server, However I still has issue on connect my company dev server. What’s need to do to resolve this issue. Thanks so much. -Vincent From: Ankur Srivastava [mailto:ankur.srivast...@gmail.com] Sent: Thursday, January 29, 2015 8:02 PM To: Sun, Vincent Y Cc: user@spark.apache.org Subject: Re: Connecting Cassandra by unknow host Hi, I am no expert but have a small application working with Spark and Cassandra. I faced these issues when we were deploying our cluster on EC2 instances with some machines on public network and some on private. This seems to be a similar issue as you are trying to connect to 10.34.224.249 which is a private IP but the address you get in the error message is a public IP 30.247.7.8. If you want to connect to public IP ensure that your network settings allow you to connect using spark cluster's public IP on the port 9042. Hope this helps!! Thanks Ankur On Thu, Jan 29, 2015 at 1:33 PM, oxpeople vincent.y@bankofamerica.commailto:vincent.y@bankofamerica.com wrote: I have the code set up the Cassandra SparkConf conf = new SparkConf(true); conf.setAppName(Java cassandra RD); conf.set(*spark.cassandra.connection.host, 10.34.224.249*); but I got log try to connect different host. 15/01/29 16:16:42 INFO NettyBlockTransferService: Server created on 62002 15/01/29 16:16:42 INFO BlockManagerMaster: Trying to register BlockManager 15/01/29 16:16:42 INFO BlockManagerMasterActor: Registering block manager F6C3BE5F7042A.corp.com:62002http://F6C3BE5F7042A.corp.com:62002 with 975.5 MB RAM, BlockManagerId(driver, F6C3BE5F7042A.corp.comhttp://F6C3BE5F7042A.corp.com, 62002) 15/01/29 16:16:42 INFO BlockManagerMaster: Registered BlockManager 15/01/29 16:16:42 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 15/01/29 16:16:44 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@f6c3be5f7042a.corp.com:62064/user/Executor#-184690467http://sparkexecu...@f6c3be5f7042a.corp.com:62064/user/Executor#-184690467] with ID 0 15/01/29 16:16:44 INFO BlockManagerMasterActor: Registering block manager F6C3BE5F7042A.corp.com:62100http://F6C3BE5F7042A.corp.com:62100 with 265.4 MB RAM, BlockManagerId(0, F6C3BE5F7042A.corp, 62100) Exception in thread main java.io.IOException: Failed to open native connection to Cassandra at *{30.247.7.8}:9042* at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:174) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160) at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36) at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61) at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:71) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:97) at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:108) at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:134) at com.datastax.spark.connector.rdd.CassandraRDD.tableDef$lzycompute(CassandraRDD.scala:240) at com.datastax.spark.connector.rdd.CassandraRDD.tableDef(CassandraRDD.scala:239) at com.datastax.spark.connector.rdd.CassandraRDD.verify$lzycompute(CassandraRDD.scala:298) at com.datastax.spark.connector.rdd.CassandraRDD.verify(CassandraRDD.scala:295) at com.datastax.spark.connector.rdd.CassandraRDD.getPartitions(CassandraRDD.scala:324) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1328) at org.apache.spark.rdd.RDD.collect(RDD.scala:780) at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:309) at org.apache.spark.api.java.JavaPairRDD.collect(JavaPairRDD.scala:45) at com.bof.spark.cassandra.JavaSparkCassandraTest.run(JavaSparkCassandraTest.java:41) at com.bof.spark.cassandra.JavaSparkCassandraTest.main(JavaSparkCassandraTest.java:70) Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /30.247.7.8:9042http://30.247.7.8:9042 (com.datastax.driver.core.TransportException: [/30.247.7.8:9042http://30.247.7.8:9042] Cannot connect)) at
Re: Shuffle read/write issue in spark 1.2
Did the problem go away when you switched to lz4? There was a change from the default compression codec fro 1.0 to 1.1, where we went from LZF to Snappy. I don't think there was any such change from 1.1 to 1.2, though. On Fri, Feb 6, 2015 at 12:17 AM, Praveen Garg praveen.g...@guavus.com wrote: We tried changing the compression codec from snappy to lz4. It did improve the performance but we are still wondering why default options didn’t work as claimed. From: Raghavendra Pandey raghavendra.pan...@gmail.com Date: Friday, 6 February 2015 1:23 pm To: Praveen Garg praveen.g...@guavus.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Shuffle read/write issue in spark 1.2 Even I observed the same issue. On Fri, Feb 6, 2015 at 12:19 AM, Praveen Garg praveen.g...@guavus.com wrote: Hi, While moving from spark 1.1 to spark 1.2, we are facing an issue where Shuffle read/write has been increased significantly. We also tried running the job by rolling back to spark 1.1 configuration where we set spark.shuffle.manager to hash and spark.shuffle.blockTransferService to nio. It did improve the performance a bit but it was still much worse than spark 1.1. The scenario seems similar to the bug raised sometime back https://issues.apache.org/jira/browse/SPARK-5081. Has anyone come across any similar issue? Please tell us if any configuration change can help. Regards, Praveen
Where can I find logs set inside RDD processing functions?
I am trying to debug my mapPartitionsFunction. Here is the code. There are two ways I am trying to log using log.info() or println(). I am running in yarn-cluster mode. While I can see the logs from driver code, I am not able to see logs from map, mapPartition functions in the Application Tracking URL. Where can I find the logs? /var outputRDD = partitionedRDD.mapPartitions(p = { val outputList = new ArrayList[scala.Tuple3[Long, Long, Int]] p.map({ case(key, value) = { log.info(Inside map) println(Inside map); for(i - 0 until outputTuples.size()){ val outputRecord = outputTuples.get(i) if(outputRecord != null){ outputList.add(outputRecord.getCurrRecordProfileID(), outputRecord.getWindowRecordProfileID, outputRecord.getScore()) } } } }) outputList.iterator() })/ Here is my log4j.properties /log4j.rootCategory=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Where-can-I-find-logs-set-inside-RDD-processing-functions-tp21537.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark driver behind firewall
Hi, You can use the Spark Kernel project (https://github.com/ibm-et/spark-kernel) as a workaround of sorts. The Spark Kernel provides a generic solution to dynamically interact with an Apache Spark cluster (think of a remote Spark Shell). It serves as the driver application with which you can send Scala code to interact with Apache Spark. You would still need to expose the Spark Kernel outside the firewall (similar to Kostas' suggestion about the jobserver), of course. Signed, Chip Senkbeil On Thu Feb 05 2015 at 11:07:28 PM Kostas Sakellis kos...@cloudera.com wrote: Yes, the driver has to be able to accept incoming connections. All the executors connect back to the driver sending heartbeats, map status, metrics. It is critical and I don't know of a way around it. You could look into using something like the https://github.com/spark-jobserver/spark-jobserver that could run outside the firewall. Then from inside the firewall you can make REST calls to the server. On Thu, Feb 5, 2015 at 5:03 PM, Kane Kim kane.ist...@gmail.com wrote: I submit spark job from machine behind firewall, I can't open any incoming connections to that box, does driver absolutely need to accept incoming connections? Is there any workaround for that case? Thanks.
Re: Where can I find logs set inside RDD processing functions?
You can enable YARN log aggregation (yarn.log-aggregation-enable to true) and execute command yarn logs -applicationId your_application_id after your application finishes. Or you can look at them directly in HDFS in /tmp/logs/user/logs/applicationid/hostname On 6.2.2015. 19:50, nitinkak001 wrote: I am trying to debug my mapPartitionsFunction. Here is the code. There are two ways I am trying to log using log.info() or println(). I am running in yarn-cluster mode. While I can see the logs from driver code, I am not able to see logs from map, mapPartition functions in the Application Tracking URL. Where can I find the logs? /var outputRDD = partitionedRDD.mapPartitions(p = { val outputList = new ArrayList[scala.Tuple3[Long, Long, Int]] p.map({ case(key, value) = { log.info(Inside map) println(Inside map); for(i - 0 until outputTuples.size()){ val outputRecord = outputTuples.get(i) if(outputRecord != null){ outputList.add(outputRecord.getCurrRecordProfileID(), outputRecord.getWindowRecordProfileID, outputRecord.getScore()) } } } }) outputList.iterator() })/ Here is my log4j.properties /log4j.rootCategory=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Where-can-I-find-logs-set-inside-RDD-processing-functions-tp21537.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Beginner in Spark
You don't need HDFS or virtual machines to run Spark. You can just download it, unzip it and run it on your laptop. See http://spark.apache.org/docs/latest/index.html http://spark.apache.org/docs/latest/index.html. Matei On Feb 6, 2015, at 2:58 PM, David Fallside falls...@us.ibm.com wrote: King, consider trying the Spark Kernel (https://github.com/ibm-et/spark-kernel https://github.com/ibm-et/spark-kernel) which will install Spark etc and provide you with a Spark/Scala Notebook in which you can develop your algorithm. The Vagrant installation described in https://github.com/ibm-et/spark-kernel/wiki/Vagrant-Development-Environment https://github.com/ibm-et/spark-kernel/wiki/Vagrant-Development-Environment will have you quickly up and running on a single machine without having to manage the details of the system installations. There is a Docker version, https://github.com/ibm-et/spark-kernel/wiki/Using-the-Docker-Container-for-the-Spark-Kernel https://github.com/ibm-et/spark-kernel/wiki/Using-the-Docker-Container-for-the-Spark-Kernel, if you prefer Docker. Regards, David King sami kgsam...@gmail.com wrote on 02/06/2015 08:09:39 AM: From: King sami kgsam...@gmail.com To: user@spark.apache.org Date: 02/06/2015 08:11 AM Subject: Beginner in Spark Hi, I'm new in Spark, I'd like to install Spark with Scala. The aim is to build a data processing system foor door events. the first step is install spark, scala, hdfs and other required tools. the second is build the algorithm programm in Scala which can treat a file of my data logs (events). Could you please help me to install the required tools: Spark, Scala, HDF and tell me how can I execute my programm treating the entry file. Best regards,
Spark SQL group by
Hi, i am trying to issue a sql query against a parquet file and am getting errors and would like some help to figure out what is going on. The sql : select timestamp, count(rid), qi.clientname from records where timestamp 0 group by qi.clientname I am getting the following error: *org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: timestamp#0L* at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:43) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:42) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156) at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:42) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection$$anonfun$$init$$2.apply(Projection.scala:52) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection$$anonfun$$init$$2.apply(Projection.scala:52) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.init(Projection.scala:52) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.init(Aggregate.scala:176) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:172) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:115) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) *Caused by: java.lang.RuntimeException: Couldn't find timestamp#0L in [aggResult:SUM(PartialCount#14L)#17L,clientName#11]* at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:46) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:43) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
Re: Spark SQL group by
You can't use columns (timestamp) that aren't in the GROUP BY clause. Spark 1.2+ give you a better error message for this case. On Fri, Feb 6, 2015 at 3:12 PM, Mohnish Kodnani mohnish.kodn...@gmail.com wrote: Hi, i am trying to issue a sql query against a parquet file and am getting errors and would like some help to figure out what is going on. The sql : select timestamp, count(rid), qi.clientname from records where timestamp 0 group by qi.clientname I am getting the following error: *org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: timestamp#0L* at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:43) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:42) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156) at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:42) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection$$anonfun$$init$$2.apply(Projection.scala:52) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection$$anonfun$$init$$2.apply(Projection.scala:52) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.init(Projection.scala:52) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.init(Aggregate.scala:176) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:172) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:115) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) *Caused by: java.lang.RuntimeException: Couldn't find timestamp#0L in [aggResult:SUM(PartialCount#14L)#17L,clientName#11]* at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:46) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:43) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
Problem when using spark.kryo.registrationRequired=true
Hi, I am trying to strict my serialized classes, as I am having weird issues with regards to serialization. However, my efforts hit a brick wall when I got the exception: Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.reflect.ClassTag$$anon$1 Note: To register this class use: kryo.register(scala.reflect.ClassTag$$anon$1.class); I tracked it down to using the sortBy method of the rdd. When using Scala, what should I add to the registry of Kryo, to support this class? Thanks This message is confidential and is for the sole use of the intended recipient(s). It may also be privileged or otherwise protected by copyright or other legal rules. If you have received it by mistake please let us know by reply email and delete it from your system. It is prohibited to copy this message or disclose its content to anyone. Any confidentiality or privilege is not waived or lost by any mistaken delivery or unauthorized disclosure of the message. All messages sent to and from Agoda may be monitored to ensure compliance with company policies, to protect the company's interests and to remove potential malware. Electronic messages may be intercepted, amended, lost or deleted, or contain viruses.
Re: Where can I find logs set inside RDD processing functions?
yarn.nodemanager.remote-app-log-dir is set to /tmp/logs On Fri, Feb 6, 2015 at 4:14 PM, Ted Yu yuzhih...@gmail.com wrote: To add to What Petar said, when YARN log aggregation is enabled, consider specifying yarn.nodemanager.remote-app-log-dir which is where aggregated logs are saved. Cheers On Fri, Feb 6, 2015 at 12:36 PM, Petar Zecevic petar.zece...@gmail.com wrote: You can enable YARN log aggregation (yarn.log-aggregation-enable to true) and execute command yarn logs -applicationId your_application_id after your application finishes. Or you can look at them directly in HDFS in /tmp/logs/user/logs/ applicationid/hostname On 6.2.2015. 19:50, nitinkak001 wrote: I am trying to debug my mapPartitionsFunction. Here is the code. There are two ways I am trying to log using log.info() or println(). I am running in yarn-cluster mode. While I can see the logs from driver code, I am not able to see logs from map, mapPartition functions in the Application Tracking URL. Where can I find the logs? /var outputRDD = partitionedRDD.mapPartitions(p = { val outputList = new ArrayList[scala.Tuple3[Long, Long, Int]] p.map({ case(key, value) = { log.info(Inside map) println(Inside map); for(i - 0 until outputTuples.size()){ val outputRecord = outputTuples.get(i) if(outputRecord != null){ outputList.add(outputRecord. getCurrRecordProfileID(), outputRecord.getWindowRecordProfileID, outputRecord.getScore()) } } } }) outputList.iterator() })/ Here is my log4j.properties /log4j.rootCategory=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$ SparkILoopInterpreter=INFO/ -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Where-can-I-find-logs-set-inside- RDD-processing-functions-tp21537.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Beginner in Spark
King, consider trying the Spark Kernel ( https://github.com/ibm-et/spark-kernel) which will install Spark etc and provide you with a Spark/Scala Notebook in which you can develop your algorithm. The Vagrant installation described in https://github.com/ibm-et/spark-kernel/wiki/Vagrant-Development-Environment will have you quickly up and running on a single machine without having to manage the details of the system installations. There is a Docker version, https://github.com/ibm-et/spark-kernel/wiki/Using-the-Docker-Container-for-the-Spark-Kernel , if you prefer Docker. Regards, David King sami kgsam...@gmail.com wrote on 02/06/2015 08:09:39 AM: From: King sami kgsam...@gmail.com To: user@spark.apache.org Date: 02/06/2015 08:11 AM Subject: Beginner in Spark Hi, I'm new in Spark, I'd like to install Spark with Scala. The aim is to build a data processing system foor door events. the first step is install spark, scala, hdfs and other required tools. the second is build the algorithm programm in Scala which can treat a file of my data logs (events). Could you please help me to install the required tools: Spark, Scala, HDF and tell me how can I execute my programm treating the entry file. Best regards,
Re: Spark SQL group by
Doh :) Thanks.. seems like brain freeze. On Fri, Feb 6, 2015 at 3:22 PM, Michael Armbrust mich...@databricks.com wrote: You can't use columns (timestamp) that aren't in the GROUP BY clause. Spark 1.2+ give you a better error message for this case. On Fri, Feb 6, 2015 at 3:12 PM, Mohnish Kodnani mohnish.kodn...@gmail.com wrote: Hi, i am trying to issue a sql query against a parquet file and am getting errors and would like some help to figure out what is going on. The sql : select timestamp, count(rid), qi.clientname from records where timestamp 0 group by qi.clientname I am getting the following error: *org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: timestamp#0L* at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:43) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:42) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156) at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:42) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection$$anonfun$$init$$2.apply(Projection.scala:52) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection$$anonfun$$init$$2.apply(Projection.scala:52) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.init(Projection.scala:52) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.init(Aggregate.scala:176) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:172) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:115) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) *Caused by: java.lang.RuntimeException: Couldn't find timestamp#0L in [aggResult:SUM(PartialCount#14L)#17L,clientName#11]* at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:46) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:43) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
SQL group by on Parquet table slower when table cached
Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Re: NaiveBayes classifier causes ShuffleDependency class cast exception
Can you try creating just a single spark context and then try your code. If you want to use it for streaming pass the same sparkcontext object instead of conf. Note: Instead of just replying to me , try to use reply to all so that the post is visible for the community . That way you can expect immediate responses. On Fri, Feb 6, 2015 at 6:09 AM, aanilpala aanilp...@gmail.com wrote: I have the following code: SparkConf conf = new SparkConf().setAppName(streamer).setMaster(local[2]); conf.set(spark.driver.allowMultipleContexts, true); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(batch_interval)); ssc.checkpoint(/tmp/spark/checkpoint); SparkConf conf2 = new SparkConf().setAppName(classifier).setMaster(local[1]); conf2.set(spark.driver.allowMultipleContexts, true); JavaSparkContext sc = new JavaSparkContext(conf); JavaReceiverInputDStreamString stream = ssc.socketTextStream(localhost, ); // String to Tuple3 Conversion JavaDStreamTuple3lt;Long, String, String tuple_stream = stream.map(new FunctionString, Tuple3lt;Long, String, String() { ... }); JavaPairDStreamInteger, DictionaryEntry raw_dictionary_stream = tuple_stream.filter(new FunctionTuple3lt;Long, String,String, Boolean() { @Override public Boolean call(Tuple3Long, String,String tuple) throws Exception { if((tuple._1()/Time.scaling_factor % training_interval) training_dur) NaiveBayes.train(sc.parallelize(training_set).rdd()); return true; } }). I am working on a text mining project and I want to use NaiveBayesClassifier of MLlib to classify some stream items. So, I have two Spark contexts one of which is a streaming context. The call to NaiveBayes.train causes the following exception. Any ideas? Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassCastException: org.apache.spark.SparkContext$$anonfun$runJob$4 cannot be cast to org.apache.spark.ShuffleDependency at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
WebUI on yarn through ssh tunnel affected by ami filtered
Hi folks, I am new to spark. I just get spark 1.2 to run on emr ami 3.3.1 (hadoop 2.4). I ssh to emr master node and submit the job or start the shell. Everything runs well except the webUI. In order to see the UI, I used ssh tunnel which forward my dev machine port to emr master node webUI port. When I open the webUI, at the very beginning of the application (during the spark launch time), the webUI is as nice as shown in many spark introduction articles. However, once the YARN amifilter started to work, the webUI becomes very ugly. One picture can be displayed, only text can be shown (just like you view it in lynx). Meanwhile, in spark shell, it pops up amfilter.AmIpFilter (AmIpFilter.java:doFilter(157)) - Could not find proxy-user cookie, so user will not be set”. Can anyone give me some help? Thank you! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
WebUI on yarn through ssh tunnel affected by AmIpfilter
Hi folks, I am new to spark. I just get spark 1.2 to run on emr ami 3.3.1 (hadoop 2.4). I ssh to emr master node and submit the job or start the shell. Everything runs well except the webUI. In order to see the UI, I used ssh tunnel which forward my dev machine port to emr master node webUI port. When I open the webUI, at the very beginning of the application (during the spark launch time), the webUI is as nice as shown in many spark docs. However, once the YARN AmIpfilter started to work, the webUI becomes very ugly. No pictures can be displayed, only text can be shown (just like you view it in lynx). Meanwhile, in spark shell, it pops up amfilter.AmIpFilter (AmIpFilter.java:doFilter(157)) - Could not find proxy-user cookie, so user will not be set”. Can anyone give me some help? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/WebUI-on-yarn-through-ssh-tunnel-affected-by-AmIpfilter-tp21540.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle read/write issue in spark 1.2
Yes. It improved the performance but not only with spark 1.2 but spark 1.1 also. Precisely, job took more time to run in spark 1.2 with default options but got completed in almost equal time when ran with “lz4” as of spark 1.1 with “lz4”. From: Aaron Davidson ilike...@gmail.commailto:ilike...@gmail.com Date: Saturday, 7 February 2015 1:22 am To: Praveen Garg praveen.g...@guavus.commailto:praveen.g...@guavus.com Cc: Raghavendra Pandey raghavendra.pan...@gmail.commailto:raghavendra.pan...@gmail.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Shuffle read/write issue in spark 1.2 Did the problem go away when you switched to lz4? There was a change from the default compression codec fro 1.0 to 1.1, where we went from LZF to Snappy. I don't think there was any such change from 1.1 to 1.2, though. On Fri, Feb 6, 2015 at 12:17 AM, Praveen Garg praveen.g...@guavus.commailto:praveen.g...@guavus.com wrote: We tried changing the compression codec from snappy to lz4. It did improve the performance but we are still wondering why default options didn’t work as claimed. From: Raghavendra Pandey raghavendra.pan...@gmail.commailto:raghavendra.pan...@gmail.com Date: Friday, 6 February 2015 1:23 pm To: Praveen Garg praveen.g...@guavus.commailto:praveen.g...@guavus.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Shuffle read/write issue in spark 1.2 Even I observed the same issue. On Fri, Feb 6, 2015 at 12:19 AM, Praveen Garg praveen.g...@guavus.commailto:praveen.g...@guavus.com wrote: Hi, While moving from spark 1.1 to spark 1.2, we are facing an issue where Shuffle read/write has been increased significantly. We also tried running the job by rolling back to spark 1.1 configuration where we set spark.shuffle.manager to hash and spark.shuffle.blockTransferService to nio. It did improve the performance a bit but it was still much worse than spark 1.1. The scenario seems similar to the bug raised sometime back https://issues.apache.org/jira/browse/SPARK-5081. Has anyone come across any similar issue? Please tell us if any configuration change can help. Regards, Praveen
naive bayes text classifier with tf-idf in pyspark
Hi, I've got the following code http://pastebin.com/3kexKwg6 that's almost complete, but I have 2 questions: 1) Once I've computed the TF-IDF vector, how do I compute the vector for each string to feed into the LabeledPoint? 2) Does MLLib provide any methods to evaluate the model's precision, recall, F-score, etc? All I saw in the documentation wasMLlib supports common evaluation metrics for binary classification (not available inPySpark). This includes precision, recall, F-measure. What about other classifiers besides binary, and from PySpark? thanks, imran
Re: SQL group by on Parquet table slower when table cached
Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Re: generate a random matrix with uniform distribution
Hi, You can do the following: ``` import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.random._ // sc is the spark context, numPartitions is the number of partitions you want the RDD to be in val dist: RDD[Vector] = RandomRDDs.normalVectorRDD(sc, n, k, numPartitions, seed) // make the distribution uniform between (-1, 1) val data = dist.map(_ * 2 - 1) val matrix = new RowMatrix(data, n, k) On Feb 6, 2015 11:18 AM, Donbeo lucapug...@gmail.com wrote: Hi I would like to know how can I generate a random matrix where each element come from a uniform distribution in -1, 1 . In particular I would like the matrix be a distributed row matrix with dimension n x p Is this possible with mllib? Should I use another library? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/generate-a-random-matrix-with-uniform-distribution-tp21538.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark Metrics Servlet for driver and executor
Hi Judy, For driver, it is /metrics/json, there's no metricsServlet for executor. Thanks Jerry From: Judy Nash [mailto:judyn...@exchange.microsoft.com] Sent: Friday, February 6, 2015 3:47 PM To: user@spark.apache.org Subject: Spark Metrics Servlet for driver and executor Hi all, Looking at spark metricsServlet. What is the url exposing driver executor json response? Found master and worker successfully, but can't find url that return json for the other 2 sources. Thanks! Judy
PR Request
Hi, When we submit a PR in Github, there are various tests that are performed like RAT test, Scala Style Test, and beyond this many other tests which run for more time. Could anyone please direct me to the details of the tests that are performed there? Thank You
spark 1.2 writing on parquet after a join never ends - GC problems
Hi all, I’m experiencing a strange behaviour of spark 1.2. I’ve a 3 node cluster + the master. each node has: 1 HDD 7200 rpm 1 TB 16 GB RAM 8 core I configured executors with 6 cores and 10 GB each ( spark.storage.memoryFraction = 0.6 ) My job is pretty simple: val file1 = sc.parquetFile(“path1”) //19M rows val file2 = sc.textFile(“path2”) //12K rows val join = file1.as(‘f1’).join(file2.as(‘f2’), LeftOuter, Some(“f1.field”.attr === ”f2.field”.attr)) join.map( _.toCaseClass() ).saveAsParquetFile( “path3” ) When I perform this job into the spark-shell without writing on parquet file, but performing a final count to execute the pipeline, it’s pretty fast. When I submit the application to the cluster with the saveAsParquetFile instruction, task execution slows progressively and it never ends. I debugged this behaviour and I found that the cause is the executor’s disconnection due to missing heartbeat. Missing heatbeat in my opinion is related to GC (I report to you a piece of GC log from one of the executors) 484.861: [GC [PSYoungGen: 2053788K-718157K(2561024K)] 7421222K-6240219K(9551872K), 2.6802130 secs] [Times: user=1.94 sys=0.60, real=2.68 secs] 497.751: [GC [PSYoungGen: 2560845K-782081K(2359808K)] 8082907K-6984335K(9350656K), 4.8611660 secs] [Times: user=3.66 sys=1.55, real=4.86 secs] 510.654: [GC [PSYoungGen: 2227457K-625664K(2071552K)] 8429711K-7611342K(9062400K), 22.5727850 secs] [Times: user=3.34 sys=2.43, real=22.57 secs] 533.745: [Full GC [PSYoungGen: 625664K-0K(2071552K)] [ParOldGen: 6985678K-2723917K(6990848K)] 7611342K-2723917K(9062400K) [PSPermGen: 62290K-6 K(124928K)], 56.9075910 secs] [Times: user=65.28 sys=5.91, real=56.90 secs] 667.637: [GC [PSYoungGen: 1445376K-623184K(2404352K)] 4169293K-3347101K(9395200K), 11.7959290 secs] [Times: user=1.58 sys=0.60, real=11.79 secs] 690.936: [GC [PSYoungGen: 1973328K-584256K(2422784K)] 4697245K-3932841K(9413632K), 39.3594850 secs] [Times: user=2.88 sys=0.96, real=39.36 secs] 789.891: [GC [PSYoungGen: 1934400K-585552K(2434048K)] 5282985K-4519857K(9424896K), 17.4456720 secs] [Times: user=2.65 sys=1.36, real=17.44 secs] 814.697: [GC [PSYoungGen: 1951056K-330109K(2426880K)] 5885361K-4851426K(9417728K), 20.9578300 secs] [Times: user=1.64 sys=0.81, real=20.96 secs] 842.968: [GC [PSYoungGen: 1695613K-180290K(2489344K)] 6216930K-4888775K(9480192K), 3.2760780 secs] [Times: user=0.40 sys=0.30, real=3.28 secs] 886.660: [GC [PSYoungGen: 1649218K-427552K(2475008K)] 6357703K-5239028K(9465856K), 5.4738210 secs] [Times: user=1.47 sys=0.25, real=5.48 secs] 897.979: [GC [PSYoungGen: 1896480K-634144K(2487808K)] 6707956K-5874208K(9478656K), 23.6440110 secs] [Times: user=2.63 sys=1.11, real=23.64 secs] 929.706: [GC [PSYoungGen: 2169632K-663200K(2199040K)] 7409696K-6538992K(9189888K), 39.3632270 secs] [Times: user=3.36 sys=1.71, real=39.36 secs] 1006.206: [GC [PSYoungGen: 2198688K-655584K(2449920K)] 8074480K-7196224K(9440768K), 98.5040880 secs] [Times: user=161.53 sys=6.71, real=98.49 secs] 1104.790: [Full GC [PSYoungGen: 655584K-0K(2449920K)] [ParOldGen: 6540640K-6290292K(6990848K)] 7196224K-6290292K(9440768K) [PSPermGen: 62247K-6224 7K(131072K)], 610.0023700 secs] [Times: user=1630.17 sys=27.80, real=609.93 secs] 1841.916: [Full GC [PSYoungGen: 1440256K-0K(2449920K)] [ParOldGen: 6290292K-6891868K(6990848K)] 7730548K-6891868K(9440768K) [PSPermGen: 62266K-622 66K(131072K)], 637.4852230 secs] [Times: user=2035.09 sys=36.09, real=637.40 secs] 2572.012: [Full GC [PSYoungGen: 1440256K-509513K(2449920K)] [ParOldGen: 6891868K-6990703K(6990848K)] 8332124K-7500217K(9440768K) [PSPermGen: 62275K -62275K(129024K)], 698.2497860 secs] [Times: user=2261.54 sys=37.63, real=698.26 secs] 3326.711: [Full GC It might seem that the writing file operation is too slow and it’s a bottleneck, but then I tried to chenge my algorithm in the following way : val file1 = sc.parquetFile(“path1”) //19M rows val file2 = sc.textFile(“path2”) //12K rows val bFile2 = sc.broadcast( file2.collect.groupBy( f2 = f2.filed ) ) //broadcast of the smaller file as Map() file1.map( f1 = ( f1, bFile2.value( f1.field ).head ) ) //manual join .map( _toCaseClass() ) .saveAsParquetFile( “path3” ) in this way the task is fast and ends without problems, so now I’m pretty confused. * Join works well if I use count as final action * Parquet write is working well without previous join operation * Parquet write after join never ends and I detected GC problems Anyone can figure out what it’s happening ? Thanks Paolo
pyspark importing custom module
Hi, is there a way to use custom python module that is available to all executors under PYTHONPATH (without a need to upload it using sc.addPyFile()) - bit weird that this module is on all nodes yet the spark tasks can't use it (references to its objects are serialized and sent to all executors but since the module doesn't get imported the calls fail). thanks,Antony.
Re: Problems with GC and time to execute with different number of executors.
That's definitely surprising to me that you would be hitting a lot of GC for this scenario. Are you setting --executor-cores and --executor-memory? What are you setting them to? -Sandy On Thu, Feb 5, 2015 at 10:17 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Any idea why if I use more containers I get a lot of stopped because GC? 2015-02-05 8:59 GMT+01:00 Guillermo Ortiz konstt2...@gmail.com: I'm not caching the data. with each iteration I mean,, each 128mb that a executor has to process. The code is pretty simple. final Conversor c = new Conversor(null, null, null, longFields,typeFields); SparkConf conf = new SparkConf().setAppName(Simple Application); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDbyte[] rdd = sc.binaryRecords(path, c.calculaLongBlock()); JavaRDDString rddString = rdd.map(new Functionbyte[], String() { @Override public String call(byte[] arg0) throws Exception { String result = c.parse(arg0).toString(); return result; } }); rddString.saveAsTextFile(url + /output/ + System.currentTimeMillis()+ /); The parse function just takes an array of bytes and applies some transformations like,,, [0..3] an integer, [4...20] an String, [21..27] another String and so on. It's just a test code, I'd like to understand what it's happeing. 2015-02-04 18:57 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com: Hi Guillermo, What exactly do you mean by each iteration? Are you caching data in memory? -Sandy On Wed, Feb 4, 2015 at 5:02 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I execute a job in Spark where I'm processing a file of 80Gb in HDFS. I have 5 slaves: (32cores /256Gb / 7physical disks) x 5 I have been trying many different configurations with YARN. yarn.nodemanager.resource.memory-mb 196Gb yarn.nodemanager.resource.cpu-vcores 24 I have tried to execute the job with different number of executors a memory (1-4g) With 20 executors takes 25s each iteration (128mb) and it never has a really long time waiting because GC. When I execute around 60 executors the process time it's about 45s and some tasks take until one minute because GC. I have no idea why it's calling GC when I execute more executors simultaneously. The another question it's why it takes more time to execute each block. My theory about the this it's because there're only 7 physical disks and it's not the same 5 processes writing than 20. The code is pretty simple, it's just a map function which parse a line and write the output in HDFS. There're a lot of substrings inside of the function what it could cause GC. Any theory about? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Link existing Hive to Spark
Hi, I have Hive in development, I want to use it in Spark. Spark-SQL document says the following / Users who do not have an existing Hive deployment can still create a HiveContext. When not configured by the hive-site.xml, the context automatically creates metastore_db and warehouse in the current directory./ So I have existing hive set up and configured, how would I be able to use the same in Spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Link-existing-Hive-to-Spark-tp21531.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PR Request
Have a look at the dev/run-tests script. On Fri, Feb 6, 2015 at 2:58 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, When we submit a PR in Github, there are various tests that are performed like RAT test, Scala Style Test, and beyond this many other tests which run for more time. Could anyone please direct me to the details of the tests that are performed there? Thank You - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problems with GC and time to execute with different number of executors.
This is an execution with 80 executors MetricMin25th percentileMedian75th percentileMax Duration 31s 44s 50s 1.1min 2.6 min GC Time 70ms 0.1s 0.3s 4s 53 s Input 128.0MB 128.0MB 128.0MB 128.0MB 128.0MB I executed as well with 40 executors MetricMin25th percentileMedian75th percentileMax Duration 26s 28s 28s 30s 35s GC Time 54ms 60ms 66ms 80ms 0.4 s Input 128.0MB 128.0MB 128.0MB 128.0MB 128.0 MB I checked the %iowait and %steal in a worker it's all right in both of them I understand the value of yarn.nodemanager.resource.memory-mb is for each worker in the cluster and not the total value for YARN. it's configured at 196GB right now. (I have 5 workers) 80executors x 4Gb = 320Gb, it shouldn't be a problem. 2015-02-06 10:03 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com: Yes, having many more cores than disks and all writing at the same time can definitely cause performance issues. Though that wouldn't explain the high GC. What percent of task time does the web UI report that tasks are spending in GC? On Fri, Feb 6, 2015 at 12:56 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Yes, It's surpressing to me as well I tried to execute it with different configurations, sudo -u hdfs spark-submit --master yarn-client --class com.mycompany.app.App --num-executors 40 --executor-memory 4g Example-1.0-SNAPSHOT.jar hdfs://ip:8020/tmp/sparkTest/ file22.bin parameters This is what I executed with different values in num-executors and executor-memory. What do you think there are too many executors for those HDDs? Could it be the reason because of each executor takes more time? 2015-02-06 9:36 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com: That's definitely surprising to me that you would be hitting a lot of GC for this scenario. Are you setting --executor-cores and --executor-memory? What are you setting them to? -Sandy On Thu, Feb 5, 2015 at 10:17 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Any idea why if I use more containers I get a lot of stopped because GC? 2015-02-05 8:59 GMT+01:00 Guillermo Ortiz konstt2...@gmail.com: I'm not caching the data. with each iteration I mean,, each 128mb that a executor has to process. The code is pretty simple. final Conversor c = new Conversor(null, null, null, longFields,typeFields); SparkConf conf = new SparkConf().setAppName(Simple Application); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDbyte[] rdd = sc.binaryRecords(path, c.calculaLongBlock()); JavaRDDString rddString = rdd.map(new Functionbyte[], String() { @Override public String call(byte[] arg0) throws Exception { String result = c.parse(arg0).toString(); return result; } }); rddString.saveAsTextFile(url + /output/ + System.currentTimeMillis()+ /); The parse function just takes an array of bytes and applies some transformations like,,, [0..3] an integer, [4...20] an String, [21..27] another String and so on. It's just a test code, I'd like to understand what it's happeing. 2015-02-04 18:57 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com: Hi Guillermo, What exactly do you mean by each iteration? Are you caching data in memory? -Sandy On Wed, Feb 4, 2015 at 5:02 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I execute a job in Spark where I'm processing a file of 80Gb in HDFS. I have 5 slaves: (32cores /256Gb / 7physical disks) x 5 I have been trying many different configurations with YARN. yarn.nodemanager.resource.memory-mb 196Gb yarn.nodemanager.resource.cpu-vcores 24 I have tried to execute the job with different number of executors a memory (1-4g) With 20 executors takes 25s each iteration (128mb) and it never has a really long time waiting because GC. When I execute around 60 executors the process time it's about 45s and some tasks take until one minute because GC. I have no idea why it's calling GC when I execute more executors simultaneously. The another question it's why it takes more time to execute each block. My theory about the this it's because there're only 7 physical disks and it's not the same 5 processes writing than 20. The code is pretty simple, it's just a map function which parse a line and write the output in HDFS. There're a lot of substrings inside of the function what it could cause GC. Any theory about? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to process a file in spark standalone cluster without distributed storage (i.e. HDFS/EC2)?
Hi All, sc.textFile will not work because the file is not distributed to other workers, So I try to read the file first using FileUtils.readLines and then use sc.parallelize, but the readLines failed because OOM (file is large). Is there a way to split local files and upload those partition to each worker as RDD memory? Best regards, Henry The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.
RE: how to process a file in spark standalone cluster without distributed storage (i.e. HDFS/EC2)?
Hi All, I already find a solution to solve this problem. Please ignore my question... Thanx Best regards, Henry From: MA33 YTHung1 Sent: Friday, February 6, 2015 4:34 PM To: user@spark.apache.org Subject: how to process a file in spark standalone cluster without distributed storage (i.e. HDFS/EC2)? Hi All, sc.textFile will not work because the file is not distributed to other workers, So I try to read the file first using FileUtils.readLines and then use sc.parallelize, but the readLines failed because OOM (file is large). Is there a way to split local files and upload those partition to each worker as RDD memory? Best regards, Henry The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.
spark streaming from kafka real time + batch processing in java
I want to write a spark streaming consumer for kafka in java. I want to process the data in real-time as well as store the data in hdfs in year/month/day/hour/ format. I am not sure how to achieve this. Should I write separate kafka consumers, one for writing data to HDFS and one for spark streaming? Also I would like to ask what do people generally do with the result of spark streams after aggregating over it? Is it okay to update a NoSQL DB with aggregated counts per batch interval or is it generally stored in hdfs? Is it possible to store the mini batch data from spark streaming to HDFS in a way that the data is aggregated hourly and put into HDFS in its hour folder. I would not want a lot of small files equal to the mini batches of spark per hour, that would be inefficient for running hadoop jobs later. Is anyone working on the same problem? Any help and comments would be great. Regards Mohit
Re: Problems with GC and time to execute with different number of executors.
Yes, It's surpressing to me as well I tried to execute it with different configurations, sudo -u hdfs spark-submit --master yarn-client --class com.mycompany.app.App --num-executors 40 --executor-memory 4g Example-1.0-SNAPSHOT.jar hdfs://ip:8020/tmp/sparkTest/ file22.bin parameters This is what I executed with different values in num-executors and executor-memory. What do you think there are too many executors for those HDDs? Could it be the reason because of each executor takes more time? 2015-02-06 9:36 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com: That's definitely surprising to me that you would be hitting a lot of GC for this scenario. Are you setting --executor-cores and --executor-memory? What are you setting them to? -Sandy On Thu, Feb 5, 2015 at 10:17 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Any idea why if I use more containers I get a lot of stopped because GC? 2015-02-05 8:59 GMT+01:00 Guillermo Ortiz konstt2...@gmail.com: I'm not caching the data. with each iteration I mean,, each 128mb that a executor has to process. The code is pretty simple. final Conversor c = new Conversor(null, null, null, longFields,typeFields); SparkConf conf = new SparkConf().setAppName(Simple Application); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDbyte[] rdd = sc.binaryRecords(path, c.calculaLongBlock()); JavaRDDString rddString = rdd.map(new Functionbyte[], String() { @Override public String call(byte[] arg0) throws Exception { String result = c.parse(arg0).toString(); return result; } }); rddString.saveAsTextFile(url + /output/ + System.currentTimeMillis()+ /); The parse function just takes an array of bytes and applies some transformations like,,, [0..3] an integer, [4...20] an String, [21..27] another String and so on. It's just a test code, I'd like to understand what it's happeing. 2015-02-04 18:57 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com: Hi Guillermo, What exactly do you mean by each iteration? Are you caching data in memory? -Sandy On Wed, Feb 4, 2015 at 5:02 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I execute a job in Spark where I'm processing a file of 80Gb in HDFS. I have 5 slaves: (32cores /256Gb / 7physical disks) x 5 I have been trying many different configurations with YARN. yarn.nodemanager.resource.memory-mb 196Gb yarn.nodemanager.resource.cpu-vcores 24 I have tried to execute the job with different number of executors a memory (1-4g) With 20 executors takes 25s each iteration (128mb) and it never has a really long time waiting because GC. When I execute around 60 executors the process time it's about 45s and some tasks take until one minute because GC. I have no idea why it's calling GC when I execute more executors simultaneously. The another question it's why it takes more time to execute each block. My theory about the this it's because there're only 7 physical disks and it's not the same 5 processes writing than 20. The code is pretty simple, it's just a map function which parse a line and write the output in HDFS. There're a lot of substrings inside of the function what it could cause GC. Any theory about? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parsing CSV files in Spark
Hi! I'm new to Spark. I have a case study that where the data is store in CSV files. These files have headers with morte than 1000 columns. I would like to know what are the best practice to parsing them and in special the following points: 1. Getting and parsing all the files from a folder 2. What CSV parser do you use? 3. I would like to select just some columns whose names matches a pattern and then pass the selected columns values (plus the column names) to the processing and save the output to a CSV (preserving the selected columns). If you have any experience with some points above, it will be really helpful (for me and for the others that will encounter the same cases) if you can share your thoughts. Thanks. Regards, Florin
Re: Problems with GC and time to execute with different number of executors.
Yes, having many more cores than disks and all writing at the same time can definitely cause performance issues. Though that wouldn't explain the high GC. What percent of task time does the web UI report that tasks are spending in GC? On Fri, Feb 6, 2015 at 12:56 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Yes, It's surpressing to me as well I tried to execute it with different configurations, sudo -u hdfs spark-submit --master yarn-client --class com.mycompany.app.App --num-executors 40 --executor-memory 4g Example-1.0-SNAPSHOT.jar hdfs://ip:8020/tmp/sparkTest/ file22.bin parameters This is what I executed with different values in num-executors and executor-memory. What do you think there are too many executors for those HDDs? Could it be the reason because of each executor takes more time? 2015-02-06 9:36 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com: That's definitely surprising to me that you would be hitting a lot of GC for this scenario. Are you setting --executor-cores and --executor-memory? What are you setting them to? -Sandy On Thu, Feb 5, 2015 at 10:17 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Any idea why if I use more containers I get a lot of stopped because GC? 2015-02-05 8:59 GMT+01:00 Guillermo Ortiz konstt2...@gmail.com: I'm not caching the data. with each iteration I mean,, each 128mb that a executor has to process. The code is pretty simple. final Conversor c = new Conversor(null, null, null, longFields,typeFields); SparkConf conf = new SparkConf().setAppName(Simple Application); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDbyte[] rdd = sc.binaryRecords(path, c.calculaLongBlock()); JavaRDDString rddString = rdd.map(new Functionbyte[], String() { @Override public String call(byte[] arg0) throws Exception { String result = c.parse(arg0).toString(); return result; } }); rddString.saveAsTextFile(url + /output/ + System.currentTimeMillis()+ /); The parse function just takes an array of bytes and applies some transformations like,,, [0..3] an integer, [4...20] an String, [21..27] another String and so on. It's just a test code, I'd like to understand what it's happeing. 2015-02-04 18:57 GMT+01:00 Sandy Ryza sandy.r...@cloudera.com: Hi Guillermo, What exactly do you mean by each iteration? Are you caching data in memory? -Sandy On Wed, Feb 4, 2015 at 5:02 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I execute a job in Spark where I'm processing a file of 80Gb in HDFS. I have 5 slaves: (32cores /256Gb / 7physical disks) x 5 I have been trying many different configurations with YARN. yarn.nodemanager.resource.memory-mb 196Gb yarn.nodemanager.resource.cpu-vcores 24 I have tried to execute the job with different number of executors a memory (1-4g) With 20 executors takes 25s each iteration (128mb) and it never has a really long time waiting because GC. When I execute around 60 executors the process time it's about 45s and some tasks take until one minute because GC. I have no idea why it's calling GC when I execute more executors simultaneously. The another question it's why it takes more time to execute each block. My theory about the this it's because there're only 7 physical disks and it's not the same 5 processes writing than 20. The code is pretty simple, it's just a map function which parse a line and write the output in HDFS. There're a lot of substrings inside of the function what it could cause GC. Any theory about? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: one is the default value for intercepts in GeneralizedLinearAlgorithm
Thanks for the reply. Seems it is all set to zero in the latest code - I was checking 1.2 last night. On Fri Feb 06 2015 at 07:21:35 Sean Owen so...@cloudera.com wrote: It looks like the initial intercept term is 1 only in the addIntercept numOfLinearPredictor == 1 case. It does seem inconsistent; since it's just an initial weight it may not matter to the final converged value. You can see a few notes in the class about how numOfLinearPredictor == 1 is handled a bit inconsistently and how a smarter choice of initial intercept could help convergence. So I don't know if this rises to the level of bug but I don't know that the difference is on purpose. On Thu, Feb 5, 2015 at 5:40 PM, jamborta jambo...@gmail.com wrote: hi all, I have been going through the GeneralizedLinearAlgorithm to understand how intercepts are handled in regression. Just noticed that the initial setting for the intercept is set to one (whereas the initial setting for the rest of the coefficients is set to zero) using the same piece of code that adds the 1 in front of each line in the data. Is this a bug? thanks, -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/one-is-the-default-value-for-intercepts-in- GeneralizedLinearAlgorithm-tp21525.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: checking
Yes they are. On Fri, Feb 6, 2015 at 5:06 PM, Mohit Durgapal durgapalmo...@gmail.com wrote: Just wanted to know If my emails are reaching the user list. Regards Mohit -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Reg GraphX APSP
Hi, Is the implementation of All Pairs Shortest Path on GraphX for directed graphs or undirected graph? When I use the algorithm with dataset, it assumes that the graph is undirected. Has anyone come across that earlier? Thank you
Re: spark streaming from kafka real time + batch processing in java
Mohit, I want to process the data in real-time as well as store the data in hdfs in year/month/day/hour/ format. Are you wanting to process it and then put it into HDFS or just put the raw data into HDFS? If the later then why not just use Camus ( https://github.com/linkedin/camus), it will easily put the data into the directory structure you are after. On Fri, Feb 6, 2015 at 12:19 AM, Mohit Durgapal durgapalmo...@gmail.com wrote: I want to write a spark streaming consumer for kafka in java. I want to process the data in real-time as well as store the data in hdfs in year/month/day/hour/ format. I am not sure how to achieve this. Should I write separate kafka consumers, one for writing data to HDFS and one for spark streaming? Also I would like to ask what do people generally do with the result of spark streams after aggregating over it? Is it okay to update a NoSQL DB with aggregated counts per batch interval or is it generally stored in hdfs? Is it possible to store the mini batch data from spark streaming to HDFS in a way that the data is aggregated hourly and put into HDFS in its hour folder. I would not want a lot of small files equal to the mini batches of spark per hour, that would be inefficient for running hadoop jobs later. Is anyone working on the same problem? Any help and comments would be great. Regards Mohit
Question about recomputing lost partition of rdd ?
Hi, I have this doubt: Assume that an rdd is stored across multiple nodes and one of the nodes fails. So, a partition is lost. Now, I know that when this node is back, it uses the lineage from its neighbours and recomputes that partition alone. 1) How does it get the source data (original data before applying any transformations) that is lost during the crash. Is it our responsibility to get back the source data before using the lineage?. We have only lineage stored on other nodes. 2)Suppose the underlying HDFS deploys replication factor =3. We know that spark doesn't replicate RDD. When a partition is lost, is there a possibility to use the second copy of the original data stored in HDFS and generate the required partition using lineage from other nodes?. 3)Does it make any difference to spark if HDFS replicates its blocks more that once? Can someone please enlighten me on these fundamentals? Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Question-about-recomputing-lost-partition-of-rdd-tp21535.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How do I set spark.local.dirs?
I'm running on EC2 and I want to set the directory to use on the slaves (mounted EBS volumes). I have set: spark.local.dir /vol3/my-spark-dir in /root/spark/conf/spark-defaults.conf and replicated to all nodes. I have verified that in the console the value in the config corresponds. I have checked that these values are present in nodes. But it's still creating temp files in the wrong (default) place: /mnt2/spark How do I get my slaves to pick up this value? How can I verify that they have? Thanks! Joe
Re: Parsing CSV files in Spark
I've been doing a bunch of work with CSVs in Spark, mostly saving them as a merged CSV (instead of the various part-n files). You might find the following links useful: - This article is about combining the part files and outputting a header as the first line in the merged results: http://java.dzone.com/articles/spark-write-csv-file-header - This was my take on the previous author's original article, but it doesn't yet handle the header row: http://deploymentzone.com/2015/01/30/spark-and-merged-csv-files/ spark-csv helps with reading CSV data and mapping a schema for Spark SQL, but as of now doesn't save CSV data. On Fri Feb 06 2015 at 9:49:06 AM Sean Owen so...@cloudera.com wrote: You can do this manually without much trouble: get your files on a distributed store like HDFS, read them with textFile, filter out headers, parse with a CSV library like Commons CSV, select columns, format and store the result. That's tens of lines of code. However you probably want to start by looking at https://github.com/databricks/spark-csv which may make it even easier than that and give you a richer query syntax. On Fri, Feb 6, 2015 at 8:37 AM, Spico Florin spicoflo...@gmail.com wrote: Hi! I'm new to Spark. I have a case study that where the data is store in CSV files. These files have headers with morte than 1000 columns. I would like to know what are the best practice to parsing them and in special the following points: 1. Getting and parsing all the files from a folder 2. What CSV parser do you use? 3. I would like to select just some columns whose names matches a pattern and then pass the selected columns values (plus the column names) to the processing and save the output to a CSV (preserving the selected columns). If you have any experience with some points above, it will be really helpful (for me and for the others that will encounter the same cases) if you can share your thoughts. Thanks. Regards, Florin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Link existing Hive to Spark
Hi Todd, Thanks for the input. I use IntelliJ as IDE and I create a SBT project. And in build.sbt I write all the dependencies in build.sbt. For example hive,spark-sql etc. These dependencies stays in local ivy2 repository after getting downloaded from maven central. Should I go in ivy2 and put hive-site.xml there? If I build Spark from source code , I can put the file in conf/ but I am avoiding that. From: Todd Nist tsind...@gmail.com Sent: Friday, February 6, 2015 8:32 PM To: Ashutosh Trivedi (MT2013030) Cc: user@spark.apache.org Subject: Re: Link existing Hive to Spark Hi Ashu, Per the documents: Configuration of Hive is done by placing your hive-site.xml file in conf/. For example, you can place a something like this in your $SPARK_HOME/conf/hive-site.xml file: configuration property namehive.metastore.uris/name !-- Ensure that the following statement points to the Hive Metastore URI in your cluster -- valuethrift://HostNameHere:9083/value descriptionURI for client to contact metastore server/description /property /configuration HTH. -Todd On Fri, Feb 6, 2015 at 4:12 AM, ashu ashutosh.triv...@iiitb.orgmailto:ashutosh.triv...@iiitb.org wrote: Hi, I have Hive in development, I want to use it in Spark. Spark-SQL document says the following / Users who do not have an existing Hive deployment can still create a HiveContext. When not configured by the hive-site.xml, the context automatically creates metastore_db and warehouse in the current directory./ So I have existing hive set up and configured, how would I be able to use the same in Spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Link-existing-Hive-to-Spark-tp21531.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
RE: get null potiner exception newAPIHadoopRDD.map()
Thanks. The data is there, I have checked the row count and dump to file. -Vincent From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Thursday, February 05, 2015 2:28 PM To: Sun, Vincent Y Cc: user Subject: Re: get null potiner exception newAPIHadoopRDD.map() Is it possible that value.get((area_code)) or value.get(time_zone)) returned null ? On Thu, Feb 5, 2015 at 10:58 AM, oxpeople vincent.y@bankofamerica.commailto:vincent.y@bankofamerica.com wrote: I modified the code Base on CassandraCQLTest. to get the area code count base on time zone. I got error on create new map Rdd. Any helping is appreciated. Thanks. ... val arecodeRdd = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[CqlPagingInputFormat], classOf[java.util.Map[String,ByteBuffer]], classOf[java.util.Map[String,ByteBuffer]]) println(Count: + arecodeRdd.count) //got right count // arecodeRdd.saveAsTextFile(/tmp/arecodeRddrdd.txt); val areaCodeSelectedRDD = arecodeRdd.map { case (key, value) = { * (ByteBufferUtil.string(value.get((area_code)), ByteBufferUtil.string(value.get(time_zone))) * //failed } } println(areaCodeRDD: + areaCodeSelectedRDD.count) ... Here is the stack trace: 15/02/05 13:38:15 ERROR executor.Executor: Exception in task 109.0 in stage 1.0 (TID 366) java.lang.NullPointerException at org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:167) at org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:124) at org.apache.spark.examples.CassandraAreaCodeLocation$$anonfun$1.apply(CassandraAreaCodeLocation.scala:68) at org.apache.spark.examples.CassandraAreaCodeLocation$$anonfun$1.apply(CassandraAreaCodeLocation.scala:66) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1311) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:910) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:910) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/05 13:38:15 INFO scheduler.TaskSetManager: Starting task 110.0 in stage 1.0 (TID 367, localhost, ANY, 1334 bytes) 15/02/05 13:38:15 INFO executor.Executor: Running task 110.0 in stage 1.0 (TID 367) 15/02/05 13:38:15 INFO rdd.NewHadoopRDD: Input split: ColumnFamilySplit((-8484684946848467066, '-8334833978340269788] @[127.0.0.1]) 15/02/05 13:38:15 WARN scheduler.TaskSetManager: Lost task 109.0 in stage 1.0 (TID 366, localhost): java.lang.NullPointerException at org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:167) at org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:124) at org.apache.spark.examples.CassandraAreaCodeLocation$$anonfun$1.apply(CassandraAreaCodeLocation.scala:68) at org.apache.spark.examples.CassandraAreaCodeLocation$$anonfun$1.apply(CassandraAreaCodeLocation.scala:66) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1311) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:910) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:910) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/get-null-potiner-exception-newAPIHadoopRDD-map-tp21520.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Parsing CSV files in Spark
You can do this manually without much trouble: get your files on a distributed store like HDFS, read them with textFile, filter out headers, parse with a CSV library like Commons CSV, select columns, format and store the result. That's tens of lines of code. However you probably want to start by looking at https://github.com/databricks/spark-csv which may make it even easier than that and give you a richer query syntax. On Fri, Feb 6, 2015 at 8:37 AM, Spico Florin spicoflo...@gmail.com wrote: Hi! I'm new to Spark. I have a case study that where the data is store in CSV files. These files have headers with morte than 1000 columns. I would like to know what are the best practice to parsing them and in special the following points: 1. Getting and parsing all the files from a folder 2. What CSV parser do you use? 3. I would like to select just some columns whose names matches a pattern and then pass the selected columns values (plus the column names) to the processing and save the output to a CSV (preserving the selected columns). If you have any experience with some points above, it will be really helpful (for me and for the others that will encounter the same cases) if you can share your thoughts. Thanks. Regards, Florin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: How to design a long live spark application
Thanks. I think about it, yes, the DAG engine should not have issue to build the right graph in different threads (at least in theory, it is not an issue). So now I have another question: if I have a context initiated, but there is no operation on it for very long time, will there a timeout on it? How Spark to control/maintain/detect the live of the client spark context? Do I need to setup something special? Regards, Shuai From: Eugen Cepoi [mailto:cepoi.eu...@gmail.com] Sent: Thursday, February 05, 2015 5:39 PM To: Shuai Zheng Cc: Corey Nolet; Charles Feduke; user@spark.apache.org Subject: Re: How to design a long live spark application Yes you can submit multiple actions from different threads to the same SparkContext. It is safe. Indeed what you want to achieve is quite common. Expose some operations over a SparkContext through HTTP. I have used spray for this and it just worked fine. At bootstrap of your web app, start a sparkcontext, maybe preprocess some data and cache it, then start accepting requests against this sc. Depending where you place the initialization code, you can block the server from initializing until your context is ready. This is nice if you don't want to accept requests while the context is being prepared. Eugen 2015-02-05 23:22 GMT+01:00 Shuai Zheng szheng.c...@gmail.com: This example helps a lot J But I am thinking a below case: Assume I have a SparkContext as a global variable. Then if I use multiple threads to access/use it. Will it mess up? For example: My code: public static ListTuple2Integer, Double run(JavaSparkContext sparkContext, MapInteger, ListExposureInfo cache, Properties prop, ListEghInfo el) throws IOException, InterruptedException { JavaRDDEghInfo lines = sparkContext.parallelize(el, 100); Lines.map(…) … Lines.count() } If I have two threads call this method at the same time and pass in the same SparkContext. Will SparkContext be thread-safe? I am a bit worry here, in traditional java, it should be, but in Spark context, I am not 100% sure. Basically the sparkContext need to smart enough to differentiate the different method context (RDD add to it from different methods), so create two different DAG for different method. Anyone can confirm this? This is not something I can easily test with code. Thanks! Regards, Shuai From: Corey Nolet [mailto:cjno...@gmail.com] Sent: Thursday, February 05, 2015 11:55 AM To: Charles Feduke Cc: Shuai Zheng; user@spark.apache.org Subject: Re: How to design a long live spark application Here's another lightweight example of running a SparkContext in a common java servlet container: https://github.com/calrissian/spark-jetty-server On Thu, Feb 5, 2015 at 11:46 AM, Charles Feduke charles.fed...@gmail.com wrote: If you want to design something like Spark shell have a look at: http://zeppelin-project.org/ Its open source and may already do what you need. If not, its source code will be helpful in answering the questions about how to integrate with long running jobs that you have. On Thu Feb 05 2015 at 11:42:56 AM Boromir Widas vcsub...@gmail.com wrote: You can check out https://github.com/spark-jobserver/spark-jobserver - this allows several users to upload their jars and run jobs with a REST interface. However, if all users are using the same functionality, you can write a simple spray server which will act as the driver and hosts the spark context+RDDs, launched in client mode. On Thu, Feb 5, 2015 at 10:25 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I want to develop a server side application: User submit request à Server run spark application and return (this might take a few seconds). So I want to host the server to keep the long-live context, I don’t know whether this is reasonable or not. Basically I try to have a global JavaSparkContext instance and keep it there, and initialize some RDD. Then my java application will use it to submit the job. So now I have some questions: 1, if I don’t close it, will there any timeout I need to configure on the spark server? 2, In theory I want to design something similar to Spark shell (which also host a default sc there), just it is not shell based. Any suggestion? I think my request is very common for application development, here must someone has done it before? Regards, Shawn
Re: How do I set spark.local.dirs?
Can you try setting SPARK_LOCAL_DIRS in spark-env.sh ? Cheers On Fri, Feb 6, 2015 at 7:30 AM, Joe Wass jw...@crossref.org wrote: I'm running on EC2 and I want to set the directory to use on the slaves (mounted EBS volumes). I have set: spark.local.dir /vol3/my-spark-dir in /root/spark/conf/spark-defaults.conf and replicated to all nodes. I have verified that in the console the value in the config corresponds. I have checked that these values are present in nodes. But it's still creating temp files in the wrong (default) place: /mnt2/spark How do I get my slaves to pick up this value? How can I verify that they have? Thanks! Joe
Re: spark streaming from kafka real time + batch processing in java
Good questions, some of which I'd like to know the answer to. Is it okay to update a NoSQL DB with aggregated counts per batch interval or is it generally stored in hdfs? This depends on how you are going to use the aggregate data. 1. Is there a lot of data? If so, and you are going to use the data as inputs to another job, it might benefit from being distributed across the cluster on HDFS (for data locality). 2. Usually when speaking about aggregates there is be substantially less data, in which case storing that data in another datastore is okay. If you're talking about a few thousand rows, and having them in something like Mongo or Postgres makes your life easier (reporting software, for example) - even if you use them as inputs to another job - its okay to just store the results in another data store. If the data will grow unbounded over time this might not be a good solution (in which case refer to #1). On Fri Feb 06 2015 at 6:16:39 AM Mohit Durgapal durgapalmo...@gmail.com wrote: I want to write a spark streaming consumer for kafka in java. I want to process the data in real-time as well as store the data in hdfs in year/month/day/hour/ format. I am not sure how to achieve this. Should I write separate kafka consumers, one for writing data to HDFS and one for spark streaming? Also I would like to ask what do people generally do with the result of spark streams after aggregating over it? Is it okay to update a NoSQL DB with aggregated counts per batch interval or is it generally stored in hdfs? Is it possible to store the mini batch data from spark streaming to HDFS in a way that the data is aggregated hourly and put into HDFS in its hour folder. I would not want a lot of small files equal to the mini batches of spark per hour, that would be inefficient for running hadoop jobs later. Is anyone working on the same problem? Any help and comments would be great. Regards Mohit
Re: Link existing Hive to Spark
Hi Ashu, Per the documents: Configuration of Hive is done by placing your hive-site.xml file in conf/. For example, you can place a something like this in your $SPARK_HOME/conf/hive-site.xml file: configuration property namehive.metastore.uris/name *!-- Ensure that the following statement points to the Hive Metastore URI in your cluster --* valuethrift://*HostNameHere*:9083/value descriptionURI for client to contact metastore server/description /property /configuration HTH. -Todd On Fri, Feb 6, 2015 at 4:12 AM, ashu ashutosh.triv...@iiitb.org wrote: Hi, I have Hive in development, I want to use it in Spark. Spark-SQL document says the following / Users who do not have an existing Hive deployment can still create a HiveContext. When not configured by the hive-site.xml, the context automatically creates metastore_db and warehouse in the current directory./ So I have existing hive set up and configured, how would I be able to use the same in Spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Link-existing-Hive-to-Spark-tp21531.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do I set spark.local.dirs?
Did you restart the slaves so they would read the settings? You don't need to start/stop the EC2 cluster, just the slaves. From the master node: $SPARK_HOME/sbin/stop-slaves.sh $SPARK_HOME/sbin/start-slaves.sh ($SPARK_HOME is probably /root/spark) On Fri Feb 06 2015 at 10:31:18 AM Joe Wass jw...@crossref.org wrote: I'm running on EC2 and I want to set the directory to use on the slaves (mounted EBS volumes). I have set: spark.local.dir /vol3/my-spark-dir in /root/spark/conf/spark-defaults.conf and replicated to all nodes. I have verified that in the console the value in the config corresponds. I have checked that these values are present in nodes. But it's still creating temp files in the wrong (default) place: /mnt2/spark How do I get my slaves to pick up this value? How can I verify that they have? Thanks! Joe
Re: Parsing CSV files in Spark
As Sean said, this is just a few lines of code. You can see an example here: https://github.com/AyasdiOpenSource/bigdf/blob/master/src/main/scala/com/ayasdi/bigdf/DF.scala#L660 https://github.com/AyasdiOpenSource/bigdf/blob/master/src/main/scala/com/ayasdi/bigdf/DF.scala#L660 On Feb 6, 2015, at 7:29 AM, Charles Feduke charles.fed...@gmail.com wrote: I've been doing a bunch of work with CSVs in Spark, mostly saving them as a merged CSV (instead of the various part-n files). You might find the following links useful: - This article is about combining the part files and outputting a header as the first line in the merged results: http://java.dzone.com/articles/spark-write-csv-file-header http://java.dzone.com/articles/spark-write-csv-file-header - This was my take on the previous author's original article, but it doesn't yet handle the header row: http://deploymentzone.com/2015/01/30/spark-and-merged-csv-files/ http://deploymentzone.com/2015/01/30/spark-and-merged-csv-files/ spark-csv helps with reading CSV data and mapping a schema for Spark SQL, but as of now doesn't save CSV data. On Fri Feb 06 2015 at 9:49:06 AM Sean Owen so...@cloudera.com mailto:so...@cloudera.com wrote: You can do this manually without much trouble: get your files on a distributed store like HDFS, read them with textFile, filter out headers, parse with a CSV library like Commons CSV, select columns, format and store the result. That's tens of lines of code. However you probably want to start by looking at https://github.com/databricks/spark-csv https://github.com/databricks/spark-csv which may make it even easier than that and give you a richer query syntax. On Fri, Feb 6, 2015 at 8:37 AM, Spico Florin spicoflo...@gmail.com mailto:spicoflo...@gmail.com wrote: Hi! I'm new to Spark. I have a case study that where the data is store in CSV files. These files have headers with morte than 1000 columns. I would like to know what are the best practice to parsing them and in special the following points: 1. Getting and parsing all the files from a folder 2. What CSV parser do you use? 3. I would like to select just some columns whose names matches a pattern and then pass the selected columns values (plus the column names) to the processing and save the output to a CSV (preserving the selected columns). If you have any experience with some points above, it will be really helpful (for me and for the others that will encounter the same cases) if you can share your thoughts. Thanks. Regards, Florin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Spark Driver Host under Yarn
I'm running Spark 1.2 with Yarn. My logs show that my executors are failing to connect to my driver. This is because they are using the wrong hostname. Since I'm running with Yarn, I can't set spark.driver.host as explained in SPARK-4253. So it should come from my HDFS configuration. Do you know which piece of HDFS configuration determines my driver hostname? It's definitely not using the hostname i have in yarn-site.xml:yarn.iresourcemanager.hostname or core-site.xml:fs.default.name. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Driver-Host-under-Yarn-tp21536.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
matrix of random variables with spark.
Hi all, this is my first email with this mailing list and I hope that I am not doing anything wrong. I am currently trying to define a distributed matrix with n rows and k columns where each element is randomly sampled by a uniform distribution. How can I do that? It would be also nice if you can suggest me any good guide that I can use to start working with Spark. (The quick start tutorial is not enough for me ) Thanks a lot !
Beginner in Spark
Hi, I'm new in Spark, I'd like to install Spark with Scala. The aim is to build a data processing system foor door events. the first step is install spark, scala, hdfs and other required tools. the second is build the algorithm programm in Scala which can treat a file of my data logs (events). Could you please help me to install the required tools: Spark, Scala, HDF and tell me how can I execute my programm treating the entry file. Best regards,
Re: Question about recomputing lost partition of rdd ?
I think there are a number of misconceptions here. It is not necessary that the original node come back in order to recreate the lost partition. The lineage is not retrieved from neighboring nodes. The source data is retrieved in the same way that it was the first time that the partition was computed. The caller does not need to do anything; Spark does the recomputation. The point is that the creation of the partition is deterministic and so can be replayed anywhere. Spark *can* replicate RDDs, optionally. Resilience of data stored on HDFS is up to HDFS and is transparent to Spark. Spark will use the data locality information to try to schedule work next to the data, no matter what the replication factor. More replication potentially allows more options in scheduling tasks, I suppose, since the data is found on more nodes. On Fri, Feb 6, 2015 at 9:47 AM, Kartheek.R kartheek.m...@gmail.com wrote: Hi, I have this doubt: Assume that an rdd is stored across multiple nodes and one of the nodes fails. So, a partition is lost. Now, I know that when this node is back, it uses the lineage from its neighbours and recomputes that partition alone. 1) How does it get the source data (original data before applying any transformations) that is lost during the crash. Is it our responsibility to get back the source data before using the lineage?. We have only lineage stored on other nodes. 2)Suppose the underlying HDFS deploys replication factor =3. We know that spark doesn't replicate RDD. When a partition is lost, is there a possibility to use the second copy of the original data stored in HDFS and generate the required partition using lineage from other nodes?. 3)Does it make any difference to spark if HDFS replicates its blocks more that once? Can someone please enlighten me on these fundamentals? Thank you View this message in context: Question about recomputing lost partition of rdd ? Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Link existing Hive to Spark
ok.Is there no way to specify it in code, when I create SparkConf ? From: Todd Nist tsind...@gmail.com Sent: Friday, February 6, 2015 10:08 PM To: Ashutosh Trivedi (MT2013030) Cc: user@spark.apache.org Subject: Re: Link existing Hive to Spark You can always just add the entry, /etc/hadoop/conf to the appropriate classpath entry in $SPARK_HOME/conf/spark-defaults.conf. On Fri, Feb 6, 2015 at 11:16 AM, Ashutosh Trivedi (MT2013030) ashutosh.triv...@iiitb.orgmailto:ashutosh.triv...@iiitb.org wrote: Hi Todd, Thanks for the input. I use IntelliJ as IDE and I create a SBT project. And in build.sbt I write all the dependencies in build.sbt. For example hive,spark-sql etc. These dependencies stays in local ivy2 repository after getting downloaded from maven central. Should I go in ivy2 and put hive-site.xml there? If I build Spark from source code , I can put the file in conf/ but I am avoiding that. From: Todd Nist tsind...@gmail.commailto:tsind...@gmail.com Sent: Friday, February 6, 2015 8:32 PM To: Ashutosh Trivedi (MT2013030) Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Link existing Hive to Spark Hi Ashu, Per the documents: Configuration of Hive is done by placing your hive-site.xml file in conf/. For example, you can place a something like this in your $SPARK_HOME/conf/hive-site.xml file: configuration property namehive.metastore.uris/name !-- Ensure that the following statement points to the Hive Metastore URI in your cluster -- valuethrift://HostNameHere:9083/value descriptionURI for client to contact metastore server/description /property /configuration HTH. -Todd On Fri, Feb 6, 2015 at 4:12 AM, ashu ashutosh.triv...@iiitb.orgmailto:ashutosh.triv...@iiitb.org wrote: Hi, I have Hive in development, I want to use it in Spark. Spark-SQL document says the following / Users who do not have an existing Hive deployment can still create a HiveContext. When not configured by the hive-site.xml, the context automatically creates metastore_db and warehouse in the current directory./ So I have existing hive set up and configured, how would I be able to use the same in Spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Link-existing-Hive-to-Spark-tp21531.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org