Can't access remote Hive table from spark
Hi, I built and started a single node standalone Spark 1.2.0 cluster along with a single node Hive 0.14.0 instance installed by Ambari 1.17.0. On the Spark and Hive node I can create and query tables inside Hive, and on remote machines I can submit the SparkPi example to the Spark master. But I failed to run the following example code : public class SparkTest { public static void main(String[] args) { String appName= This is a test application; String master=spark://lix1.bh.com:7077; SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf); JavaHiveContext sqlCtx = new org.apache.spark.sql.hive.api.java.JavaHiveContext(sc); //sqlCtx.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)); //sqlCtx.sql(LOAD DATA LOCAL INPATH '/opt/spark/examples/src/main/resources/kv1.txt' INTO TABLE src); // Queries are expressed in HiveQL. ListRow rows = sqlCtx.sql(FROM src SELECT key, value).collect(); System.out.print(I got + rows.size() + rows \r\n); sc.close();} } Exception in thread main org.apache.hadoop.hive.ql.metadata.InvalidTableException: Table not found src at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:980) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at
Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)
Hi Nicholas, thanks for your reply. I checked spark-redshift - it's just for the unload data files stored on hadoop, not for online result sets from DB. Do you know of any example of a custom RDD which fetches the data on the fly (not reading from HDFS)? Thanks. Denis From: Nicholas Chammas nicholas.cham...@gmail.com To: Denis Mikhalkin deni...@yahoo.com; user@spark.apache.org user@spark.apache.org Sent: Sunday, 25 January 2015, 3:06 Subject: Re: Analyzing data from non-standard data sources (e.g. AWS Redshift) I believe databricks provides an rdd interface to redshift. Did you check spark-packages.org? On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin deni...@yahoo.com.invalid wrote: Hello, we've got some analytics data in AWS Redshift. The data is being constantly updated. I'd like to be able to write a query against Redshift which would return a subset of data, and then run a Spark job (Pyspark) to do some analysis. I could not find an RDD which would let me do it OOB (Python), so I tried writing my own. For example, tried combination of a generator (via yield) with parallelize. It appears though that parallelize reads all the data first into memory as I get either OOM or Python swaps as soon as I increase the number of rows beyond trivial limits. I've also looked at Java RDDs (there is an example of MySQL RDD) but it seems that it also reads all the data into memory. So my question is - how to correctly feed Spark with huge datasets which don't initially reside in HDFS/S3 (ideally for Pyspark, but would appreciate any tips)? Thanks. Denis
where storagelevel DISK_ONLY persists RDD to
I would like to persist RDD TO HDFS or NFS mount. How to change the location?
foreachActive functionality
Can someone help me to understand the usage of foreachActive function introduced for the Vectors. I am trying to understand its usage in MultivariateOnlineSummarizer class for summary statistics. sample.foreachActive { (index, value) = if (value != 0.0) { if (currMax(index) value) { currMax(index) = value } if (currMin(index) value) { currMin(index) = value } val prevMean = currMean(index) val diff = value - prevMean currMean(index) = prevMean + diff / (nnz(index) + 1.0) currM2n(index) += (value - currMean(index)) * diff currM2(index) += value * value currL1(index) += math.abs(value) nnz(index) += 1.0 } } Regards, Kundan
graph.inDegrees including zero values
Hi, If a vertex has no in-degree then Spark's GraphOp 'inDegree' does not return it at all. Instead, it would be very useful to me to be able to have that vertex returned with an in-degree of zero. What's the best way to achieve this using the GraphX API? For example, given a graph with nodes A,B,C, where A is connected to B and B is connected to C like so: A -- B -- graph.inDegrees returns: B: 1 C: 1 But I would like: A: 0 B: 1 C: 1 Cheers, Stefano -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/graph-inDegrees-including-zero-values-tp21354.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: what is the roadmap for Spark SQL dialect in the coming releases?
Thanks Michael. A clarification. So the HQL dialect provided by HiveContext, does it use catalyst optimizer? I though HiveContext is only related to Hive integration in Spark! Would be grateful if you could clarify this cheers On Sun, Jan 25, 2015 at 1:23 AM, Michael Armbrust mich...@databricks.com wrote: I generally recommend people use the HQL dialect provided by the HiveContext when possible: http://spark.apache.org/docs/latest/sql-programming-guide.html#getting-started I'll also note that this is distinct from the Hive on Spark project, which is based on the Hive query optimizer / execution engine instead of the catalyst optimizer that is shipped with Spark. On Thu, Jan 22, 2015 at 3:12 AM, Niranda Perera niranda.per...@gmail.com wrote: Hi, would like to know if there is an update on this? rgds On Mon, Jan 12, 2015 at 10:44 AM, Niranda Perera niranda.per...@gmail.com wrote: Hi, I found out that SparkSQL supports only a relatively small subset of SQL dialect currently. I would like to know the roadmap for the coming releases. And, are you focusing more on popularizing the 'Hive on Spark' SQL dialect or the Spark SQL dialect? Rgds -- Niranda -- Niranda -- Niranda
RE: Can't access remote Hive table from spark
This happened to me as well, putting hive-site.xml inside conf doesn't seem to work. Instead I added /etc/hive/conf to SPARK_CLASSPATH and it worked. You can try this approach. -Skanda -Original Message- From: guxiaobo1982 guxiaobo1...@qq.com Sent: 25-01-2015 13:50 To: user@spark.apache.org user@spark.apache.org Subject: Can't access remote Hive table from spark Hi, I built and started a single node standalone Spark 1.2.0 cluster along with a single node Hive 0.14.0 instance installed by Ambari 1.17.0. On the Spark and Hive node I can create and query tables inside Hive, and on remote machines I can submit the SparkPi example to the Spark master. But I failed to run the following example code : public class SparkTest { public static void main(String[] args) { String appName= This is a test application; String master=spark://lix1.bh.com:7077; SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf); JavaHiveContext sqlCtx = new org.apache.spark.sql.hive.api.java.JavaHiveContext(sc); //sqlCtx.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)); //sqlCtx.sql(LOAD DATA LOCAL INPATH '/opt/spark/examples/src/main/resources/kv1.txt' INTO TABLE src); // Queries are expressed in HiveQL. ListRow rows = sqlCtx.sql(FROM src SELECT key, value).collect(); System.out.print(I got + rows.size() + rows \r\n); sc.close();} } Exception in thread main org.apache.hadoop.hive.ql.metadata.InvalidTableException: Table not found src at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:980) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) at
Re: spark streaming with checkpoint
Yeah use streaming to gather the incoming logs and write to log file then run a spark job evry 5 minutes to process the counts. Got it. Thanks a lot. On 07:07, Mon, 26 Jan 2015 Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Tue, Jan 20, 2015 at 8:16 PM, balu.naren balu.na...@gmail.com wrote: I am a beginner to spark streaming. So have a basic doubt regarding checkpoints. My use case is to calculate the no of unique users by day. I am using reduce by key and window for this. Where my window duration is 24 hours and slide duration is 5 mins. Adding to what others said, this feels more like a task for run a Spark job every five minutes using cron than using the sliding window functionality from Spark Streaming. Tobias
Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)
I've got my solution working: https://gist.github.com/cfeduke/3bca88ed793ddf20ea6d I couldn't actually perform the steps I outlined in the previous message in this thread because I would ultimately be trying to serialize a SparkContext to the workers to use during the generation of 1..*n* JdbcRDDs. So I took a look at the source for JdbcRDD and it was trivial to adjust to my needs. This got me thinking about your problem; the JdbcRDD that ships with Spark will shard the query across the cluster by a Long ID value (requiring you to put ? placeholders in your query for use as part of a range boundary) so if you've got such a key - or any series field that happens to be a Long - then you'd just need to use the PostgreSQL JDBC driver and get your JDBC URL: http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html If you have something other than Long for your primary key/series data type then you can do the same thing I did and modify a copy of JdbcRDD, though your changes would be even fewer than my own. (Though I can't see anything much different than a Long or date/time working for this since it has to partition the full range into appropriate sub-ranges.) Because of the sub-range bucketing and cluster distribution you shouldn't run into OOM errors, assuming you provision sufficient worker nodes in the cluster. On Sun Jan 25 2015 at 9:39:56 AM Charles Feduke charles.fed...@gmail.com wrote: I'm facing a similar problem except my data is already pre-sharded in PostgreSQL. I'm going to attempt to solve it like this: - Submit the shard names (database names) across the Spark cluster as a text file and partition it so workers get 0 or more - hopefully 1 - shard name. In this case you could partition ranges - if your primary key is a datetime, then a start/end datetime pair; or if its a long then a start/end long pair. (You may need to run a separate job to get your overall start/end pair and then calculate how many partitions you need from there.) - Write the job so that the worker loads data from its shard(s) and unions the RDDs together. In the case of pairs the concept is the same. Basically look at how the JdbcRDD constructor requires a start, end, and query (disregard numPartitions in this case since we're manually partitioning in the step above). Your query will be its initial filter conditions plus a between condition for the primary key and its pair. - Operate on the union RDDs with other transformations or filters. If everything works as planned then the data should be spread out across the cluster and no one node will be responsible for loading TiBs of data and then distributing it to its peers. That should help with your OOM problem. Of course this does not guarantee that the data is balanced across nodes. With a large amount of data it should balance well enough to get the job done though. (You may need to run several refinements against the complete dataset to figure out the appropriate start/end pair values to get an RDD that is partitioned and balanced across the workers. This is a task best performed using aggregate query logic or stored procedures. With my shard problem I don't have this option available.) Unless someone has a better idea, in which case I'd love to hear it. On Sun Jan 25 2015 at 4:19:38 AM Denis Mikhalkin deni...@yahoo.com.invalid wrote: Hi Nicholas, thanks for your reply. I checked spark-redshift - it's just for the unload data files stored on hadoop, not for online result sets from DB. Do you know of any example of a custom RDD which fetches the data on the fly (not reading from HDFS)? Thanks. Denis -- *From:* Nicholas Chammas nicholas.cham...@gmail.com *To:* Denis Mikhalkin deni...@yahoo.com; user@spark.apache.org user@spark.apache.org *Sent:* Sunday, 25 January 2015, 3:06 *Subject:* Re: Analyzing data from non-standard data sources (e.g. AWS Redshift) I believe databricks provides an rdd interface to redshift. Did you check spark-packages.org? On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin deni...@yahoo.com.invalid wrote: Hello, we've got some analytics data in AWS Redshift. The data is being constantly updated. I'd like to be able to write a query against Redshift which would return a subset of data, and then run a Spark job (Pyspark) to do some analysis. I could not find an RDD which would let me do it OOB (Python), so I tried writing my own. For example, tried combination of a generator (via yield) with parallelize. It appears though that parallelize reads all the data first into memory as I get either OOM or Python swaps as soon as I increase the number of rows beyond trivial limits. I've also looked at Java RDDs (there is an example of MySQL RDD) but it seems that it also reads all the data into memory. So my question is - how to correctly feed Spark with huge datasets which don't initially reside in HDFS/S3 (ideally for
No AMI for Spark 1.2 using ec2 scripts
Hi, When I try to launch a standalone cluster on EC2 using the scripts in the ec2 directory for Spark 1.2, I get the following error: Could not resolve AMI at: https://raw.github.com/mesos/spark-ec2/v4/ami-list/us-east-1/pvm It seems there is not yet any AMI available on EC2. Any ideas when there will be one? This works without problems for version 1.1. Starting up a cluster using these scripts is so simple and straightforward, so I am really missing it on 1.2. /Håkan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-AMI-for-Spark-1-2-using-ec2-scripts-tp21362.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Shuffle to HDFS
Hi Larry, I don’t think current Spark’s shuffle can support HDFS as a shuffle output. Anyway, is there any specific reason to spill shuffle data to HDFS or NFS, this will severely increase the shuffle time. Thanks Jerry From: Larry Liu [mailto:larryli...@gmail.com] Sent: Sunday, January 25, 2015 4:45 PM To: u...@spark.incubator.apache.org Subject: Shuffle to HDFS How to change shuffle output to HDFS or NFS?
Re: Lost task - connection closed
Please take a look at the executor logs (on both sides of the IOException) to see if there are other exceptions (e.g., OOM) which precede this one. Generally, the connections should not fail spontaneously. On Sun, Jan 25, 2015 at 10:35 PM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: Hi, I am running a program that executes map-reduce jobs in a loop. The first time the loop runs, everything is ok. After that, it starts giving the following error, first it gives it for one task, then for more tasks and eventually the entire program fails: 15/01/26 01:41:25 WARN TaskSetManager: Lost task 10.0 in stage 15.0 (TID 1063, hostnameXX): java.io.IOException: Connection from hostnameXX/172.31.109.50:50808 closed at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:98) at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:81) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:738) at io.netty.channel.AbstractChannel$AbstractUnsafe$6.run(AbstractChannel.java:606) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) Can someone help me with debugging this ? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Lost-task-connection-closed-tp21361.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: where storagelevel DISK_ONLY persists RDD to
Hi, Charles Thanks for your reply. Is it possible to persist RDD to HDFS? What is the default location to persist RDD with storagelevel DISK_ONLY? On Sun, Jan 25, 2015 at 6:26 AM, Charles Feduke charles.fed...@gmail.com wrote: I think you want to instead use `.saveAsSequenceFile` to save an RDD to someplace like HDFS or NFS it you are attempting to interoperate with another system, such as Hadoop. `.persist` is for keeping the contents of an RDD around so future uses of that particular RDD don't need to recalculate its composite parts. On Sun Jan 25 2015 at 3:36:31 AM Larry Liu larryli...@gmail.com wrote: I would like to persist RDD TO HDFS or NFS mount. How to change the location?
Announcement: Generalized K-Means Clustering on Spark
This project generalizes the Spark MLLIB K-Means clusterer to support clustering of dense or sparse, low or high dimensional data using distance functions defined by Bregman divergences. https://github.com/derrickburns/generalized-kmeans-clustering -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Announcement-Generalized-K-Means-Clustering-on-Spark-tp21363.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SVD in pyspark ?
Is the distributed SVD functionality exposed to Python yet? Seems it's only available to scala or java, unless I am missing something, looking for a pyspark equivalent to org.apache.spark.mllib.linalg.SingularValueDecomposition In case it's not there yet, is there a way to make a wrapper to call from python into the corresponding java/scala code? The reason for using python instead of just directly scala is that I like to take advantage of the notebook interface for visualization. As a side, is there a inotebook like interface for the scala based REPL? Thanks Andreas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SVD-in-pyspark-tp21356.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Lost task - connection closed
Hi, I am running a program that executes map-reduce jobs in a loop. The first time the loop runs, everything is ok. After that, it starts giving the following error, first it gives it for one task, then for more tasks and eventually the entire program fails: 15/01/26 01:41:25 WARN TaskSetManager: Lost task 10.0 in stage 15.0 (TID 1063, hostnameXX): java.io.IOException: Connection from hostnameXX/172.31.109.50:50808 closed at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:98) at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:81) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:738) at io.netty.channel.AbstractChannel$AbstractUnsafe$6.run(AbstractChannel.java:606) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) Can someone help me with debugging this ? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Lost-task-connection-closed-tp21361.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: where storagelevel DISK_ONLY persists RDD to
No, current RDD persistence mechanism do not support putting data on HDFS. The directory is spark.local.dirs. Instead you can use checkpoint() to save the RDD on HDFS. Thanks Jerry From: Larry Liu [mailto:larryli...@gmail.com] Sent: Monday, January 26, 2015 3:08 PM To: Charles Feduke Cc: u...@spark.incubator.apache.org Subject: Re: where storagelevel DISK_ONLY persists RDD to Hi, Charles Thanks for your reply. Is it possible to persist RDD to HDFS? What is the default location to persist RDD with storagelevel DISK_ONLY? On Sun, Jan 25, 2015 at 6:26 AM, Charles Feduke charles.fed...@gmail.commailto:charles.fed...@gmail.com wrote: I think you want to instead use `.saveAsSequenceFile` to save an RDD to someplace like HDFS or NFS it you are attempting to interoperate with another system, such as Hadoop. `.persist` is for keeping the contents of an RDD around so future uses of that particular RDD don't need to recalculate its composite parts. On Sun Jan 25 2015 at 3:36:31 AM Larry Liu larryli...@gmail.commailto:larryli...@gmail.com wrote: I would like to persist RDD TO HDFS or NFS mount. How to change the location?
Re: spark streaming with checkpoint
- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.2 – How to change Default (Random) port ….
This was a regression caused by Netty Block Transfer Service. The fix for this just barely missed the 1.2 release, and you can see the associated JIRA here: https://issues.apache.org/jira/browse/SPARK-4837 Current master has the fix, and the Spark 1.2.1 release will have it included. If you don't want to rebuild from master or wait, then you can turn it off by setting spark.shuffle.blockTransferService to nio. On Sun, Jan 25, 2015 at 6:28 PM, Shailesh Birari sbirar...@gmail.com wrote: Can anyone please let me know ? I don't want to open all ports on n/w. So, am interested in the property by which this new port I can configure. Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-How-to-change-Default-Random-port-tp21306p21360.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Eclipse on spark
I recommend using a build tool within eclipse, such as Gradle or Maven Le 24 janv. 2015 19:34, riginos samarasrigi...@gmail.com a écrit : How to compile a Spark project in Scala IDE for Eclipse? I got many scala scripts and i no longer want to load them from scala-shell what can i do? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Eclipse-on-spark-tp21350.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle to HDFS
Hi,Jerry Thanks for your reply. The reason I have this question is that in Hadoop, mapper intermediate output (shuffle) will be stored in HDFS. I think the default location for spark is /tmp I think. Larry On Sun, Jan 25, 2015 at 9:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Larry, I don’t think current Spark’s shuffle can support HDFS as a shuffle output. Anyway, is there any specific reason to spill shuffle data to HDFS or NFS, this will severely increase the shuffle time. Thanks Jerry *From:* Larry Liu [mailto:larryli...@gmail.com] *Sent:* Sunday, January 25, 2015 4:45 PM *To:* u...@spark.incubator.apache.org *Subject:* Shuffle to HDFS How to change shuffle output to HDFS or NFS?
RE: Shuffle to HDFS
Hey Larry, I don’t think Hadoop will put shuffle output in HDFS, instead it’s behavior is the same as what Spark did, store mapper output (shuffle) data on local disks. You might misunderstood something ☺. Thanks Jerry From: Larry Liu [mailto:larryli...@gmail.com] Sent: Monday, January 26, 2015 3:03 PM To: Shao, Saisai Cc: u...@spark.incubator.apache.org Subject: Re: Shuffle to HDFS Hi,Jerry Thanks for your reply. The reason I have this question is that in Hadoop, mapper intermediate output (shuffle) will be stored in HDFS. I think the default location for spark is /tmp I think. Larry On Sun, Jan 25, 2015 at 9:44 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Larry, I don’t think current Spark’s shuffle can support HDFS as a shuffle output. Anyway, is there any specific reason to spill shuffle data to HDFS or NFS, this will severely increase the shuffle time. Thanks Jerry From: Larry Liu [mailto:larryli...@gmail.commailto:larryli...@gmail.com] Sent: Sunday, January 25, 2015 4:45 PM To: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Shuffle to HDFS How to change shuffle output to HDFS or NFS?
Re: where storagelevel DISK_ONLY persists RDD to
I think you want to instead use `.saveAsSequenceFile` to save an RDD to someplace like HDFS or NFS it you are attempting to interoperate with another system, such as Hadoop. `.persist` is for keeping the contents of an RDD around so future uses of that particular RDD don't need to recalculate its composite parts. On Sun Jan 25 2015 at 3:36:31 AM Larry Liu larryli...@gmail.com wrote: I would like to persist RDD TO HDFS or NFS mount. How to change the location?
Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)
I'm facing a similar problem except my data is already pre-sharded in PostgreSQL. I'm going to attempt to solve it like this: - Submit the shard names (database names) across the Spark cluster as a text file and partition it so workers get 0 or more - hopefully 1 - shard name. In this case you could partition ranges - if your primary key is a datetime, then a start/end datetime pair; or if its a long then a start/end long pair. (You may need to run a separate job to get your overall start/end pair and then calculate how many partitions you need from there.) - Write the job so that the worker loads data from its shard(s) and unions the RDDs together. In the case of pairs the concept is the same. Basically look at how the JdbcRDD constructor requires a start, end, and query (disregard numPartitions in this case since we're manually partitioning in the step above). Your query will be its initial filter conditions plus a between condition for the primary key and its pair. - Operate on the union RDDs with other transformations or filters. If everything works as planned then the data should be spread out across the cluster and no one node will be responsible for loading TiBs of data and then distributing it to its peers. That should help with your OOM problem. Of course this does not guarantee that the data is balanced across nodes. With a large amount of data it should balance well enough to get the job done though. (You may need to run several refinements against the complete dataset to figure out the appropriate start/end pair values to get an RDD that is partitioned and balanced across the workers. This is a task best performed using aggregate query logic or stored procedures. With my shard problem I don't have this option available.) Unless someone has a better idea, in which case I'd love to hear it. On Sun Jan 25 2015 at 4:19:38 AM Denis Mikhalkin deni...@yahoo.com.invalid wrote: Hi Nicholas, thanks for your reply. I checked spark-redshift - it's just for the unload data files stored on hadoop, not for online result sets from DB. Do you know of any example of a custom RDD which fetches the data on the fly (not reading from HDFS)? Thanks. Denis -- *From:* Nicholas Chammas nicholas.cham...@gmail.com *To:* Denis Mikhalkin deni...@yahoo.com; user@spark.apache.org user@spark.apache.org *Sent:* Sunday, 25 January 2015, 3:06 *Subject:* Re: Analyzing data from non-standard data sources (e.g. AWS Redshift) I believe databricks provides an rdd interface to redshift. Did you check spark-packages.org? On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin deni...@yahoo.com.invalid wrote: Hello, we've got some analytics data in AWS Redshift. The data is being constantly updated. I'd like to be able to write a query against Redshift which would return a subset of data, and then run a Spark job (Pyspark) to do some analysis. I could not find an RDD which would let me do it OOB (Python), so I tried writing my own. For example, tried combination of a generator (via yield) with parallelize. It appears though that parallelize reads all the data first into memory as I get either OOM or Python swaps as soon as I increase the number of rows beyond trivial limits. I've also looked at Java RDDs (there is an example of MySQL RDD) but it seems that it also reads all the data into memory. So my question is - how to correctly feed Spark with huge datasets which don't initially reside in HDFS/S3 (ideally for Pyspark, but would appreciate any tips)? Thanks. Denis
key already cancelled error
Hi everyone, I'm writing a program that update a cassandra table. I've writen a first shot where I update the table row by row from a rdd trhough a map. Now I want to build a batch of updates using the same kind of syntax as in this thread : https://groups.google.com/forum/#!msg/spark-users/LUb7ZysYp2k/MhymcFddb8cJ But as soon as I use a mappartition I get a key already cancelled error. The program updates the table properly but it seems that the problem appears when the driver try to shut down the ressources. 15/01/26 00:07:00 INFO SparkContext: Job finished: collect at CustomerIdReconciliation.scala:143, took 1.998601568 s 15/01/26 00:07:00 INFO SparkUI: Stopped Spark web UI at http://cim1-dev:4044 15/01/26 00:07:00 INFO DAGScheduler: Stopping DAGScheduler 15/01/26 00:07:00 INFO SparkDeploySchedulerBackend: Shutting down all executors 15/01/26 00:07:00 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 15/01/26 00:07:00 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(cim1-dev2,52516) 15/01/26 00:07:00 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(cim1-dev2,52516) 15/01/26 00:07:00 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(cim1-dev2,52516) not found 15/01/26 00:07:00 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@7cedcb23 15/01/26 00:07:00 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@7cedcb23 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 15/01/26 00:07:00 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@38e8c534 15/01/26 00:07:00 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@38e8c534 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:310) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 15/01/26 00:07:00 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(cim1-dev,44773) 15/01/26 00:07:00 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(cim1-dev3,29293) 15/01/26 00:07:00 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(cim1-dev3,29293) 15/01/26 00:07:00 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@159adcf5 15/01/26 00:07:00 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@159adcf5 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 15/01/26 00:07:00 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(cim1-dev,44773) 15/01/26 00:07:00 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(cim1-dev,44773) not found 15/01/26 00:07:00 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@329a6d86 15/01/26 00:07:00 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@329a6d86 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:310) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 15/01/26 00:07:00 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@3d3e86d5 15/01/26 00:07:00 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@3d3e86d5 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:310) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 15/01/26 00:07:01 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 15/01/26 00:07:01 INFO ConnectionManager: Selector thread was interrupted! 15/01/26 00:07:01 INFO ConnectionManager: ConnectionManager stopped 15/01/26 00:07:01 INFO MemoryStore: MemoryStore cleared 15/01/26 00:07:01 INFO BlockManager: BlockManager stopped 15/01/26 00:07:01 INFO BlockManagerMaster: BlockManagerMaster stopped 15/01/26 00:07:01 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/01/26 00:07:01 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/01/26 00:07:01 INFO SparkContext: Successfully stopped SparkContext I've tried to set these 2 options but it doesn't change anything : set(spark.core.connection.ack.wait.timeout,600) set(spark.akka.frameSize,50) Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/key-already-cancelled-error-tp21357.html Sent from the Apache Spark User List
Re: Spark webUI - application details page
Hi, I've a similar problem. I want to see the detailed logs of Completed Applications so I've set in my program : set(spark.eventLog.enabled,true). set(spark.eventLog.dir,file:/tmp/spark-events) but when I click on the application in the webui, I got a page with the message : Application history not found (app-20150126000651-0331) No event logs found for application xxx$ in file:/tmp/spark-events/xxx-147211500. Did you specify the correct logging directory? despite the fact that the directory exist and contains 3 files : APPLICATION_COMPLETE* EVENT_LOG_1* SPARK_VERSION_1.1.0* I use spark 1.1.0 on a standalone cluster with 3 nodes. Any suggestion to solve the problem ? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p21358.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Eclipse on spark
Download pre build binary for window and attached all required jars in your project eclipsclass-path and go head with your eclipse. make sure you have same java version On 25 January 2015 at 07:33, riginos [via Apache Spark User List] ml-node+s1001560n21350...@n3.nabble.com wrote: How to compile a Spark project in Scala IDE for Eclipse? I got many scala scripts and i no longer want to load them from scala-shell what can i do? -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Eclipse-on-spark-tp21350.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=aG5haGFrQHd5bnlhcmRncm91cC5jb218MXwtMTgxOTE5MTkyOQ== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- Regards, Harihar Nahak BigData Developer Wynyard Email:hna...@wynyardgroup.com | Extn: 8019 - --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Eclipse-on-spark-tp21350p21359.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Pairwise Processing of a List
So you’ve got a point A and you want the sum of distances between it and all other points? Or am I misunderstanding you? // target point, can be Broadcast global sent to all workers val tarPt = (10,20) val pts = Seq((2,2),(3,3),(2,3),(10,2)) val rdd= sc.parallelize(pts) rdd.map( pt = Math.sqrt( Math.pow(tarPt._1 - pt._1,2) + Math.pow(tarPt._2 - pt._2,2)) ).reduce( (d1,d2) = d1+d2) -Joe From: Steve Nunez snu...@hortonworks.commailto:snu...@hortonworks.com Date: Sunday, January 25, 2015 at 7:32 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Pairwise Processing of a List Spark Experts, I’ve got a list of points: List[(Float, Float)]) that represent (x,y) coordinate pairs and need to sum the distance. It’s easy enough to compute the distance: case class Point(x: Float, y: Float) { def distance(other: Point): Float = sqrt(pow(x - other.x, 2) + pow(y - other.y, 2)).toFloat } (in this case I create a ‘Point’ class, but the maths are the same). What I can’t figure out is the ‘right’ way to sum distances between all the points. I can make this work by traversing the list with a for loop and using indices, but this doesn’t seem right. Anyone know a clever way to process List[(Float, Float)]) in a pairwise fashion? Regards, - Steve
Re: Pairwise Processing of a List
Hi, On Mon, Jan 26, 2015 at 9:32 AM, Steve Nunez snu...@hortonworks.com wrote: I’ve got a list of points: List[(Float, Float)]) that represent (x,y) coordinate pairs and need to sum the distance. It’s easy enough to compute the distance: Are you saying you want all combinations (N^2) of distances? That should be possible with rdd.cartesian(): val points = sc.parallelize(List((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))) points.cartesian(points).collect -- Array[((Double, Double), (Double, Double))] = Array(((1.0,2.0),(1.0,2.0)), ((1.0,2.0),(3.0,4.0)), ((1.0,2.0),(5.0,6.0)), ((3.0,4.0),(1.0,2.0)), ((3.0,4.0),(3.0,4.0)), ((3.0,4.0),(5.0,6.0)), ((5.0,6.0),(1.0,2.0)), ((5.0,6.0),(3.0,4.0)), ((5.0,6.0),(5.0,6.0))) I guess this is a very expensive operation, though. Tobias
Re: Pairwise Processing of a List
If this is really about just Scala Lists, then a simple answer (using tuples of doubles) is: val points: List[(Double,Double)] = ... val distances = for (p1 - points; p2 - points) yield { val dx = p1._1 - p2._1 val dy = p1._2 - p2._2 math.sqrt(dx*dx + dy*dy) } distances.sum / 2 It's / 2 since this counts every pair twice. You could double the speed of that, with a slightly more complex formulation using indices, that avoids comparing points to themselves and makes each comparison just once. If you really need the sum of all pairwise distances, I don't think you can do better than that (modulo dealing with duplicates intelligently). If we're talking RDDs, then the simple answer is similar: val pointsRDD: RDD[(Double,Double)] = ... val distancesRDD = pointsRDD.cartesian(pointsRDD).map { case (p1, p2) = ... } distancesRDD.sum / 2 It takes more work to make the same optimization, and involves zipWithIndex, but is possible. If the reason we're talking about Lists is that the set of points is still fairly small, but big enough that all-pairs deserves distributed computation, then I'd parallelize the List into an RDD, and also broadcast it, and then implement a hybrid of these two approaches. You'd have the outer loop over points happening in parallel via the RDD, and inner loop happening locally over the local broadcasted copy in memory. ... and if the use case isn't really to find all-pairs distances and their sum, maybe there are faster ways still to do what you need to. On Mon, Jan 26, 2015 at 12:32 AM, Steve Nunez snu...@hortonworks.com wrote: Spark Experts, I’ve got a list of points: List[(Float, Float)]) that represent (x,y) coordinate pairs and need to sum the distance. It’s easy enough to compute the distance: case class Point(x: Float, y: Float) { def distance(other: Point): Float = sqrt(pow(x - other.x, 2) + pow(y - other.y, 2)).toFloat } (in this case I create a ‘Point’ class, but the maths are the same). What I can’t figure out is the ‘right’ way to sum distances between all the points. I can make this work by traversing the list with a for loop and using indices, but this doesn’t seem right. Anyone know a clever way to process List[(Float, Float)]) in a pairwise fashion? Regards, - Steve - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pairwise Processing of a List
Not combinations, linear distances, e.g., given: List[ (x1,y1), (x2,y2), (x3,y3) ], compute the sum of: distance (x1,y2) and (x2,y2) and distance (x2,y2) and (x3,y3) Imagine that the list of coordinate point comes from a GPS and describes a trip. - Steve From: Joseph Lust jl...@mc10inc.commailto:jl...@mc10inc.com Date: Sunday, January 25, 2015 at 17:17 To: Steve Nunez snu...@hortonworks.commailto:snu...@hortonworks.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Pairwise Processing of a List So you've got a point A and you want the sum of distances between it and all other points? Or am I misunderstanding you? // target point, can be Broadcast global sent to all workers val tarPt = (10,20) val pts = Seq((2,2),(3,3),(2,3),(10,2)) val rdd= sc.parallelize(pts) rdd.map( pt = Math.sqrt( Math.pow(tarPt._1 - pt._1,2) + Math.pow(tarPt._2 - pt._2,2)) ).reduce( (d1,d2) = d1+d2) -Joe From: Steve Nunez snu...@hortonworks.commailto:snu...@hortonworks.com Date: Sunday, January 25, 2015 at 7:32 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Pairwise Processing of a List Spark Experts, I've got a list of points: List[(Float, Float)]) that represent (x,y) coordinate pairs and need to sum the distance. It's easy enough to compute the distance: case class Point(x: Float, y: Float) { def distance(other: Point): Float = sqrt(pow(x - other.x, 2) + pow(y - other.y, 2)).toFloat } (in this case I create a 'Point' class, but the maths are the same). What I can't figure out is the 'right' way to sum distances between all the points. I can make this work by traversing the list with a for loop and using indices, but this doesn't seem right. Anyone know a clever way to process List[(Float, Float)]) in a pairwise fashion? Regards, - Steve CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: spark streaming with checkpoint
Hi, On Tue, Jan 20, 2015 at 8:16 PM, balu.naren balu.na...@gmail.com wrote: I am a beginner to spark streaming. So have a basic doubt regarding checkpoints. My use case is to calculate the no of unique users by day. I am using reduce by key and window for this. Where my window duration is 24 hours and slide duration is 5 mins. Adding to what others said, this feels more like a task for run a Spark job every five minutes using cron than using the sliding window functionality from Spark Streaming. Tobias
Re: [SQL] Conflicts in inferred Json Schemas
Hi, On Thu, Jan 22, 2015 at 2:26 AM, Corey Nolet cjno...@gmail.com wrote: Let's say I have 2 formats for json objects in the same file schema1 = { location: 12345 My Lane } schema2 = { location:{houseAddres:1234 My Lane} } From my tests, it looks like the current inferSchema() function will end up with only StructField(location, StringType). In Spark SQL columns need to have a well-defined type (as in SQL in general). So inferring the schema requires that there is a schema, and I am afraid that there is not an easy way to achieve what you want in Spark SQL, as there is no data type covering both values you see. (I am pretty sure it can be done if you dive deep into the internals, add data types etc., though.) Tobias
Pairwise Processing of a List
Spark Experts, I've got a list of points: List[(Float, Float)]) that represent (x,y) coordinate pairs and need to sum the distance. It's easy enough to compute the distance: case class Point(x: Float, y: Float) { def distance(other: Point): Float = sqrt(pow(x - other.x, 2) + pow(y - other.y, 2)).toFloat } (in this case I create a 'Point' class, but the maths are the same). What I can't figure out is the 'right' way to sum distances between all the points. I can make this work by traversing the list with a for loop and using indices, but this doesn't seem right. Anyone know a clever way to process List[(Float, Float)]) in a pairwise fashion? Regards, - Steve
Re: Pairwise Processing of a List
(PS the Scala code I posted is a poor way to do it -- it would materialize the entire cartesian product in memory. You can use .iterator or .view to fix that.) Ah, so you want sum of distances between successive points. val points: List[(Double,Double)] = ... points.sliding(2).map { case List(p1,p2) = distance(p1,p2) }.sum If you import org.apache.spark.mllib.rdd.RDDFunctions._ you should have access to something similar in Spark over an RDD. It gives you a sliding() function that produces Arrays of sequential elements. Note that RDDs don't really guarantee anything about ordering though, so this only makes sense if you've already sorted some upstream RDD by a timestamp or sequence number. On Mon, Jan 26, 2015 at 1:21 AM, Steve Nunez snu...@hortonworks.com wrote: Not combinations, linear distances, e.g., given: List[ (x1,y1), (x2,y2), (x3,y3) ], compute the sum of: distance (x1,y2) and (x2,y2) and distance (x2,y2) and (x3,y3) Imagine that the list of coordinate point comes from a GPS and describes a trip. - Steve From: Joseph Lust jl...@mc10inc.com Date: Sunday, January 25, 2015 at 17:17 To: Steve Nunez snu...@hortonworks.com, user@spark.apache.org user@spark.apache.org Subject: Re: Pairwise Processing of a List So you’ve got a point A and you want the sum of distances between it and all other points? Or am I misunderstanding you? // target point, can be Broadcast global sent to all workers val tarPt = (10,20) val pts = Seq((2,2),(3,3),(2,3),(10,2)) val rdd= sc.parallelize(pts) rdd.map( pt = Math.sqrt( Math.pow(tarPt._1 - pt._1,2) + Math.pow(tarPt._2 - pt._2,2)) ).reduce( (d1,d2) = d1+d2) -Joe From: Steve Nunez snu...@hortonworks.com Date: Sunday, January 25, 2015 at 7:32 PM To: user@spark.apache.org user@spark.apache.org Subject: Pairwise Processing of a List Spark Experts, I’ve got a list of points: List[(Float, Float)]) that represent (x,y) coordinate pairs and need to sum the distance. It’s easy enough to compute the distance: case class Point(x: Float, y: Float) { def distance(other: Point): Float = sqrt(pow(x - other.x, 2) + pow(y - other.y, 2)).toFloat } (in this case I create a ‘Point’ class, but the maths are the same). What I can’t figure out is the ‘right’ way to sum distances between all the points. I can make this work by traversing the list with a for loop and using indices, but this doesn’t seem right. Anyone know a clever way to process List[(Float, Float)]) in a pairwise fashion? Regards, - Steve CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serializability: for vs. while loops
Aaron, On Thu, Jan 15, 2015 at 5:05 PM, Aaron Davidson ilike...@gmail.com wrote: Scala for-loops are implemented as closures using anonymous inner classes which are instantiated once and invoked many times. This means, though, that the code inside the loop is actually sitting inside a class, which confuses Spark's Closure Cleaner, whose job is to remove unused references from closures to make otherwise-unserializable objects serializable. My understanding is, in particular, that the closure cleaner will null out unused fields in the closure, but cannot go past the first level of depth (i.e., it will not follow field references and null out *their *unused, and possibly unserializable, references), because this could end up mutating state outside of the closure itself. Thus, the extra level of depth of the closure that was introduced by the anonymous class (where presumably the outer this pointer is considered used by the closure cleaner) is sufficient to make it unserializable. Now, two weeks later, let me add that this is one of the most helpful comments I have received on this mailing list! This insight helped me save 90% of the time I spent with debugging NotSerializableExceptions. Thank you very much! Tobias
Re: Pairwise Processing of a List
Sean, On Mon, Jan 26, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote: Note that RDDs don't really guarantee anything about ordering though, so this only makes sense if you've already sorted some upstream RDD by a timestamp or sequence number. Speaking of order, is there some reading on guarantees and non-guarantees about order in RDDs? For example, when reading a file and doing zipWithIndex, can I assume that the lines are numbered in order? Does this hold for receiving data from Kafka, too? Tobias
Re: Spark webUI - application details page
Perhaps you need to set this in your spark-defaults.conf so that¹s it¹s already set when your slave/worker processes start. -Joe On 1/25/15, 6:50 PM, ilaxes ila...@hotmail.com wrote: Hi, I've a similar problem. I want to see the detailed logs of Completed Applications so I've set in my program : set(spark.eventLog.enabled,true). set(spark.eventLog.dir,file:/tmp/spark-events) but when I click on the application in the webui, I got a page with the message : Application history not found (app-20150126000651-0331) No event logs found for application xxx$ in file:/tmp/spark-events/xxx-147211500. Did you specify the correct logging directory? despite the fact that the directory exist and contains 3 files : APPLICATION_COMPLETE* EVENT_LOG_1* SPARK_VERSION_1.1.0* I use spark 1.1.0 on a standalone cluster with 3 nodes. Any suggestion to solve the problem ? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-applicatio n-details-page-tp3490p21358.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: foreachActive functionality
The idea is to unify the code path for dense and sparse vector operations, which makes the codebase easier to maintain. By handling (index, value) tuples, you can let the foreachActive method take care of checking if the vector is sparse or dense, and running a foreach over the values. On Sun, Jan 25, 2015 at 8:18 AM, kundan kumar iitr.kun...@gmail.com wrote: Can someone help me to understand the usage of foreachActive function introduced for the Vectors. I am trying to understand its usage in MultivariateOnlineSummarizer class for summary statistics. sample.foreachActive { (index, value) = if (value != 0.0) { if (currMax(index) value) { currMax(index) = value } if (currMin(index) value) { currMin(index) = value } val prevMean = currMean(index) val diff = value - prevMean currMean(index) = prevMean + diff / (nnz(index) + 1.0) currM2n(index) += (value - currMean(index)) * diff currM2(index) += value * value currL1(index) += math.abs(value) nnz(index) += 1.0 } } Regards, Kundan
Re: graph.inDegrees including zero values
You can do this using leftJoin, as collectNeighbors [1] does: graph.vertices.leftJoin(graph.inDegrees) { (vid, attr, inDegOpt) = inDegOpt.getOrElse(0) } [1] https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala#L145 Ankur On Sun, Jan 25, 2015 at 5:52 AM, scharissis stefano.charis...@gmail.com wrote: If a vertex has no in-degree then Spark's GraphOp 'inDegree' does not return it at all. Instead, it would be very useful to me to be able to have that vertex returned with an in-degree of zero. What's the best way to achieve this using the GraphX API? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Results never return to driver | Spark Custom Reader
Hi Yana, As per my custom split code, only three splits submit to the system. So three executors are sufficient for that. but it had run 8 executors. First three executors logs show the exact output what I want(i did put some syso in console to debug the code), but next five are have some other and random exceptions. I think it is due to first three executor didn't exist properly thatswy driver run more executors on top it, which create so many processes hitting the same application and overall result it fails. from Log i can see first three executors return with exit status 1. and logs are below : 15/01/23 15:51:39 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 15/01/23 15:51:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/01/23 15:51:40 INFO spark.SecurityManager: Changing view acls to: sparkAdmin 15/01/23 15:51:40 INFO spark.SecurityManager: Changing modify acls to: sparkAdmin 15/01/23 15:51:40 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(sparkAdmin); users with modify permissions: Set(sparkAdmin) 15/01/23 15:51:40 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/01/23 15:51:40 INFO Remoting: Starting remoting 15/01/23 15:51:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@VM219:40166] 15/01/23 15:51:41 INFO util.Utils: Successfully started service 'driverPropsFetcher' on port 40166. 15/01/23 15:51:41 INFO spark.SecurityManager: Changing view acls to: sparkAdmin 15/01/23 15:51:41 INFO spark.SecurityManager: Changing modify acls to: sparkAdmin 15/01/23 15:51:41 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(sparkAdmin); users with modify permissions: Set(sparkAdmin) 15/01/23 15:51:41 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/01/23 15:51:41 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/01/23 15:51:41 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/01/23 15:51:41 INFO Remoting: Starting remoting 15/01/23 15:51:41 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 15/01/23 15:51:41 INFO util.Utils: Successfully started service 'sparkExecutor' on port 57695. 15/01/23 15:51:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@VM219:57695] 15/01/23 15:51:41 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://sparkDriver@VM220:53484/user/CoarseGrainedScheduler 15/01/23 15:51:41 INFO worker.WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@VM219:44826/user/Worker 15/01/23 15:51:41 INFO worker.WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@VM219:44826/user/Worker 15/01/23 15:51:41 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver 15/01/23 15:51:41 INFO spark.SecurityManager: Changing view acls to: sparkAdmin 15/01/23 15:51:41 INFO spark.SecurityManager: Changing modify acls to: sparkAdmin 15/01/23 15:51:41 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(sparkAdmin); users with modify permissions: Set(sparkAdmin) 15/01/23 15:51:41 INFO util.AkkaUtils: Connecting to MapOutputTracker: akka.tcp://sparkDriver@VM220:53484/user/MapOutputTracker 15/01/23 15:51:41 INFO util.AkkaUtils: Connecting to BlockManagerMaster: akka.tcp://sparkDriver@VM220:53484/user/BlockManagerMaster 15/01/23 15:51:41 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20150123155141-b237 15/01/23 15:51:41 INFO storage.MemoryStore: MemoryStore started with capacity 529.9 MB 15/01/23 15:51:41 INFO netty.NettyBlockTransferService: Server created on 54273 15/01/23 15:51:41 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/01/23 15:51:41 INFO storage.BlockManagerMaster: Registered BlockManager 15/01/23 15:51:41 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@VM220:53484/user/HeartbeatReceiver 15/01/23 15:51:47 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@VM219:57695] - [akka.tcp://sparkDriver@VM220:53484] disassociated! Shutting down. 15/01/23 15:51:47 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@VM220:53484] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. On 24 January 2015 at 06:37, Yana Kadiyska yana.kadiy...@gmail.com wrote: It looks to me like your executor actually crashed and didn't just finish properly. Can you check the executor log? It is available in the UI, or on the worker machine, under $SPARK_HOME/work/ app-20150123155114-/6/stderr (unless you manually changed the work
Re: what is the roadmap for Spark SQL dialect in the coming releases?
Yeah, the HiveContext is just a SQLContext that is extended with HQL, access to a metastore, hive UDFs and hive serdes. The query execution however is identical to a SQLContext. On Sun, Jan 25, 2015 at 7:24 AM, Niranda Perera niranda.per...@gmail.com wrote: Thanks Michael. A clarification. So the HQL dialect provided by HiveContext, does it use catalyst optimizer? I though HiveContext is only related to Hive integration in Spark! Would be grateful if you could clarify this cheers On Sun, Jan 25, 2015 at 1:23 AM, Michael Armbrust mich...@databricks.com wrote: I generally recommend people use the HQL dialect provided by the HiveContext when possible: http://spark.apache.org/docs/latest/sql-programming-guide.html#getting-started I'll also note that this is distinct from the Hive on Spark project, which is based on the Hive query optimizer / execution engine instead of the catalyst optimizer that is shipped with Spark. On Thu, Jan 22, 2015 at 3:12 AM, Niranda Perera niranda.per...@gmail.com wrote: Hi, would like to know if there is an update on this? rgds On Mon, Jan 12, 2015 at 10:44 AM, Niranda Perera niranda.per...@gmail.com wrote: Hi, I found out that SparkSQL supports only a relatively small subset of SQL dialect currently. I would like to know the roadmap for the coming releases. And, are you focusing more on popularizing the 'Hive on Spark' SQL dialect or the Spark SQL dialect? Rgds -- Niranda -- Niranda -- Niranda
Re: SVD in pyspark ?
Hi Andreas, With regard to the notebook interface, you can use the Spark Kernel ( https://github.com/ibm-et/spark-kernel) as the backend for an IPython 3.0 notebook. The kernel is designed to be the foundation for interactive applications connecting to Apache Spark and uses the IPython 5.0 message protocol - used by IPython 3.0 - to communicate. See the getting started section here: https://github.com/ibm-et/spark-kernel/wiki/Getting-Started-with-the-Spark-Kernel It discusses getting IPython connected to a Spark Kernel. If you have any more questions, feel free to ask! Signed, Chip Senkbeil IBM Emerging Technologies Software Engineer On Sun Jan 25 2015 at 1:12:32 PM Andreas Rhode m.a.rh...@gmail.com wrote: Is the distributed SVD functionality exposed to Python yet? Seems it's only available to scala or java, unless I am missing something, looking for a pyspark equivalent to org.apache.spark.mllib.linalg.SingularValueDecomposition In case it's not there yet, is there a way to make a wrapper to call from python into the corresponding java/scala code? The reason for using python instead of just directly scala is that I like to take advantage of the notebook interface for visualization. As a side, is there a inotebook like interface for the scala based REPL? Thanks Andreas -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/SVD-in-pyspark-tp21356.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.2 – How to change Default (Random) port ….
Can anyone please let me know ? I don't want to open all ports on n/w. So, am interested in the property by which this new port I can configure. Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-How-to-change-Default-Random-port-tp21306p21360.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: foreachActive functionality
PS, we were using Breeze's activeIterator originally as you can see in the old code, but we found there are overhead there, so we implement our own implementation which results 4x faster. See https://github.com/apache/spark/pull/3288 for detail. Sincerely, DB Tsai --- Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sun, Jan 25, 2015 at 12:25 PM, Reza Zadeh r...@databricks.com wrote: The idea is to unify the code path for dense and sparse vector operations, which makes the codebase easier to maintain. By handling (index, value) tuples, you can let the foreachActive method take care of checking if the vector is sparse or dense, and running a foreach over the values. On Sun, Jan 25, 2015 at 8:18 AM, kundan kumar iitr.kun...@gmail.com wrote: Can someone help me to understand the usage of foreachActive function introduced for the Vectors. I am trying to understand its usage in MultivariateOnlineSummarizer class for summary statistics. sample.foreachActive { (index, value) = if (value != 0.0) { if (currMax(index) value) { currMax(index) = value } if (currMin(index) value) { currMin(index) = value } val prevMean = currMean(index) val diff = value - prevMean currMean(index) = prevMean + diff / (nnz(index) + 1.0) currM2n(index) += (value - currMean(index)) * diff currM2(index) += value * value currL1(index) += math.abs(value) nnz(index) += 1.0 } } Regards, Kundan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org