Re: Bug in Spark SQL and Dataframes : Inferring the Schema Using Reflection?
Yep, already fixed in master: https://github.com/apache/spark/pull/4977/files You need a '.toDF()' at the end. On Sat, Mar 14, 2015 at 6:55 PM, Dean Arnold renodino...@gmail.com wrote: Running 1.3.0 from binary install. When executing the example under the subject section from within spark-shell, I get the following error: scala people.registerTempTable(people) console:35: error: value registerTempTable is not a member of org.apache.spark.rdd.RDD[Person] people.registerTempTable(people) Is there a missing statement somewhere ? Or does this need to be modified for dataframe support ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
order preservation with RDDs
Hi, I was taking a look through the mllib examples in the official spark documentation and came across the following: http://spark.apache.org/docs/1.3.0/mllib-feature-extraction.html#tab_python_2 specifically the lines: label = data.map(lambda x: x.label) features = data.map(lambda x: x.features) ... ... data1 = label.zip(scaler1.transform(features)) my question: wouldn't it be possible that some labels in the pairs returned by the label.zip(..) operation are not paired with their original features? i.e. are the original orderings of `labels` and `features` preserved after the scaler1.transform(..) and label.zip(..) operations? This issue was also mentioned in http://apache-spark-user-list.1001560.n3.nabble.com/Using-TF-IDF-from-MLlib-tp19429p19433.html I would greatly appreciate some clarification on this, as I've run into this issue whilst experimenting with feature extraction for text classification, where (correct me if I'm wrong) there is no built-in mechanism to keep track of document-ids through the HashingTF and IDF fitting and transformations. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/order-preservation-with-RDDs-tp22052.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Bug in Streaming files?
No I don't think that much is a bug, since newFilesOnly=false removes a constraint that otherwise exists, and that's what you see. However read the closely related: https://issues.apache.org/jira/browse/SPARK-6061 @tdas open question for you there. On Sat, Mar 14, 2015 at 8:18 PM, Justin Pihony justin.pih...@gmail.com wrote: All, Looking into this StackOverflow question https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469 it appears that there is a bug when utilizing the newFilesOnly parameter in FileInputDStream. Before creating a ticket, I wanted to verify it here. The gist is that this code is wrong: val modTimeIgnoreThreshold = math.max( initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting currentTime - durationToRemember.milliseconds // trailing end of the remember window ) The problem is that if you set newFilesOnly to false, then the initialModTimeIgnoreThreshold is always 0. This makes it always dropped out of the max operation. So, the best you get is files that were put in the directory (duration) from the start. Is this a bug or expected behavior; it seems like a bug to me. If I am correct, this appears to be a bigger fix than just using min as it would break other functionality. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Streaming-files-tp22051.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark there is no space on the disk
Hi Sean, Thank very much for your reply. I tried to config it from below code: sf = SparkConf().setAppName(test).set(spark.executor.memory, 45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp) But still get the error. Do you know how I can config this? Thanks, Best, Peng On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote: It means pretty much what it says. You ran out of space on an executor (not driver), because the dir used for serialization temp files is full (not all volumes). Set spark.local.dirs to something more appropriate and larger. On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com wrote: Hi I was running a logistic regression algorithm on a 8 nodes spark cluster, each node has 8 cores and 56 GB Ram (each node is running a windows system). And the spark installation driver has 1.9 TB capacity. The dataset I was training on are has around 40 million records with around 6600 features. But I always get this error during the training process: Py4JJavaError: An error occurred while calling o70.trainLogisticRegressionModelWithLBFGS. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3 in stage 3.0 (TID 2766, workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net): java.io.IOException: There is not enough space on the disk at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300) at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247) at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:243) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at
Bug in Streaming files?
All, Looking into this StackOverflow question https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469 it appears that there is a bug when utilizing the newFilesOnly parameter in FileInputDStream. Before creating a ticket, I wanted to verify it here. The gist is that this code is wrong: val modTimeIgnoreThreshold = math.max( initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting currentTime - durationToRemember.milliseconds // trailing end of the remember window ) The problem is that if you set newFilesOnly to false, then the initialModTimeIgnoreThreshold is always 0. This makes it always dropped out of the max operation. So, the best you get is files that were put in the directory (duration) from the start. Is this a bug or expected behavior; it seems like a bug to me. If I am correct, this appears to be a bigger fix than just using min as it would break other functionality. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Streaming-files-tp22051.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to create data frame from an avro file in Spark 1.3.0
In spark-avro 0.1, the method AvroContext.avroFile returns a SchemaRDD, which is deprecated in Spark 1.3.0 package com.databricks.spark import org.apache.spark.sql.{SQLContext, SchemaRDD} package object avro { /** * Adds a method, `avroFile`, to SQLContext that allows reading data stored in Avro. */ implicit class AvroContext(sqlContext: SQLContext) { def avroFile(filePath: String) = sqlContext.baseRelationToSchemaRDD(AvroRelation(filePath)(sqlContext)) } } Is there a new version of spark-avro, so that AvroContext.avroFile returns a DataFrame ? In github, spark-avro is still in version 0.1 . databricks/spark-avro | | | | | | | | | | | databricks/spark-avrospark-avro - Integration utilities for using Spark with Apache Avro data | | | | View on github.com | Preview by Yahoo | | | | | Thanks in advance for your assistance ! Shing
Bug in Spark SQL and Dataframes : Inferring the Schema Using Reflection?
Running 1.3.0 from binary install. When executing the example under the subject section from within spark-shell, I get the following error: scala people.registerTempTable(people) console:35: error: value registerTempTable is not a member of org.apache.spark.rdd.RDD[Person] people.registerTempTable(people) Is there a missing statement somewhere ? Or does this need to be modified for dataframe support ?
Re: spark there is no space on the disk
And I have 2 TB free space on C driver. On Sat, Mar 14, 2015 at 8:29 PM, Peng Xia sparkpeng...@gmail.com wrote: Hi Sean, Thank very much for your reply. I tried to config it from below code: sf = SparkConf().setAppName(test).set(spark.executor.memory, 45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp) But still get the error. Do you know how I can config this? Thanks, Best, Peng On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote: It means pretty much what it says. You ran out of space on an executor (not driver), because the dir used for serialization temp files is full (not all volumes). Set spark.local.dirs to something more appropriate and larger. On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com wrote: Hi I was running a logistic regression algorithm on a 8 nodes spark cluster, each node has 8 cores and 56 GB Ram (each node is running a windows system). And the spark installation driver has 1.9 TB capacity. The dataset I was training on are has around 40 million records with around 6600 features. But I always get this error during the training process: Py4JJavaError: An error occurred while calling o70.trainLogisticRegressionModelWithLBFGS. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3 in stage 3.0 (TID 2766, workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net): java.io.IOException: There is not enough space on the disk at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300) at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247) at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:243) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at
Re: GraphX Snapshot Partitioning
Large edge partitions could cause java.lang.OutOfMemoryError, and then spark tasks fails. FWIW, each edge partition can have at most 2^32 edges because 64-bit vertex IDs are mapped into 32-bit ones in each partitions. If #edges is over the limit, graphx could throw ArrayIndexOutOfBoundsException, or something. So, each partition can have more edges than you expect. On Wed, Mar 11, 2015 at 11:42 PM, Matthew Bucci mrbucci...@gmail.com wrote: Hi, Thanks for the response! That answered some questions I had, but the last one I was wondering is what happens if you run a partition strategy and one of the partitions ends up being too large? For example, let's say partitions can hold 64MB (actually knowing the maximum possible size of a partition would probably also be helpful to me). You try to partition the edges of a graph to 3 separate partitions but the edges in the first partition end up being 80MB worth of edges so it cannot all fit in the first partition . Would the extra 16MB flood over into a new 4th partition or would the system try to split it so that the 1st and 4th partition are both at 40MB, or would the partition strategy just fail with a memory error? Thank You, Matthew Bucci On Mon, Mar 9, 2015 at 11:07 PM, Takeshi Yamamuro linguin@gmail.com wrote: Hi, Vertices are simply hash-paritioned by their 64-bit IDs, so they are evenly spread over parititons. As for edges, GraphLoader#edgeList builds edge paritions through hadoopFile(), so the initial parititons depend on InputFormat#getSplits implementations (e.g, partitions are mostly equal to 64MB blocks for HDFS). Edges can be re-partitioned by ParititonStrategy; a graph is partitioned considering graph structures and a source ID and a destination ID are used as partition keys. The partitions might suffer from skewness depending on graph properties (hub nodes, or something). Thanks, takeshi On Tue, Mar 10, 2015 at 2:21 AM, Matthew Bucci mrbucci...@gmail.com wrote: Hello, I am working on a project where we want to split graphs of data into snapshots across partitions and I was wondering what would happen if one of the snapshots we had was too large to fit into a single partition. Would the snapshot be split over the two partitions equally, for example, and how is a single snapshot spread over multiple partitions? Thank You, Matthew Bucci -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Snapshot-Partitioning-tp21977.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- --- Takeshi Yamamuro -- --- Takeshi Yamamuro
Re: Spark Release 1.3.0 DataFrame API
programmatically specifying Schema needs import org.apache.spark.sql.type._ for StructType and StructField to resolve. On Sat, Mar 14, 2015 at 10:07 AM, Sean Owen so...@cloudera.com wrote: Yes I think this was already just fixed by: https://github.com/apache/spark/pull/4977 a .toDF() is missing On Sat, Mar 14, 2015 at 4:16 PM, Nick Pentreath nick.pentre...@gmail.com wrote: I've found people.toDF gives you a data frame (roughly equivalent to the previous Row RDD), And you can then call registerTempTable on that DataFrame. So people.toDF.registerTempTable(people) should work — Sent from Mailbox On Sat, Mar 14, 2015 at 5:33 PM, David Mitchell jdavidmitch...@gmail.com wrote: I am pleased with the release of the DataFrame API. However, I started playing with it, and neither of the two main examples in the documentation work: http://spark.apache.org/docs/1.3.0/sql-programming-guide.html Specfically: Inferring the Schema Using Reflection Programmatically Specifying the Schema Scala 2.11.6 Spark 1.3.0 prebuilt for Hadoop 2.4 and later Inferring the Schema Using Reflection scala people.registerTempTable(people) console:31: error: value registerTempTable is not a member of org.apache.spark .rdd.RDD[Person] people.registerTempTable(people) ^ Programmatically Specifying the Schema scala val peopleDataFrame = sqlContext.createDataFrame(people, schema) console:41: error: overloaded method value createDataFrame with alternatives: (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spar k.sql.DataFrame and (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.Dat aFrame and (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],columns: java.util.List[String])org.apache.spark.sql.DataFrame and (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: o rg.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame and (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache .spark.sql.types.StructType)org.apache.spark.sql.DataFrame cannot be applied to (org.apache.spark.rdd.RDD[String], org.apache.spark.sql.ty pes.StructType) val df = sqlContext.createDataFrame(people, schema) Any help would be appreciated. David - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [GRAPHX] could not process graph with 230M edges
Hi, If you have heap problems in spark/graphx, it'd be better to split partitions into smaller ones so as to fit the partition on memory. On Sat, Mar 14, 2015 at 12:09 AM, Hlib Mykhailenko hlib.mykhaile...@inria.fr wrote: Hello, I cannot process graph with 230M edges. I cloned apache.spark, build it and then tried it on cluster. I used Spark Standalone Cluster: -5 machines (each has 12 cores/32GB RAM) -'spark.executor.memory' == 25g -'spark.driver.memory' == 3g Graph has 231359027 edges. And its file weights 4,524,716,369 bytes. Graph is represented in text format: source vertex id destination vertex id My code: object Canonical { def main(args: Array[String]) { val numberOfArguments = 3 require(args.length == numberOfArguments, sWrong argument number. Should be $numberOfArguments . |Usage: path_to_grpah partiotioner_name minEdgePartitions .stripMargin) var graph: Graph[Int, Int] = null val nameOfGraph = args(0).substring(args(0).lastIndexOf(/) + 1) val partitionerName = args(1) val minEdgePartitions = args(2).toInt val sc = new SparkContext(new SparkConf() .setSparkHome(System.getenv(SPARK_HOME)) .setAppName(s partitioning | $nameOfGraph | $partitionerName | $minEdgePartitions parts ) .setJars(SparkContext.jarOfClass(this.getClass).toList)) graph = GraphLoader.edgeListFile(sc, args(0), false, edgeStorageLevel = StorageLevel.MEMORY_AND_DISK, vertexStorageLevel = StorageLevel.MEMORY_AND_DISK, minEdgePartitions = minEdgePartitions) graph = graph.partitionBy(PartitionStrategy.fromString(partitionerName)) println(graph.edges.collect.length) println(graph.vertices.collect.length) } } After I run it I encountered number of java.lang.OutOfMemoryError: Java heap space errors and of course I did not get a result. Do I have problem in the code? Or in cluster configuration? Because it works fine for relatively small graphs. But for this graph it never worked. (And I do not think that 230M edges is too big data) Thank you for any advise! -- Cordialement, *Hlib Mykhailenko* Doctorant à INRIA Sophia-Antipolis Méditerranée http://www.inria.fr/centre/sophia/ 2004 Route des Lucioles BP93 06902 SOPHIA ANTIPOLIS cedex -- --- Takeshi Yamamuro
deploying Spark on standalone cluster
Hi, I am trying to deploy spark on standalone cluster of two machines on for master node and one for worker node. i have defined the two machines in conf/slaves file and also i /etc/hosts, when i tried to run the cluster the worker node is running but the master node failed to run and throw this error: 15/03/14 07:05:04 ERROR Remoting: Remoting error: [Startup failed] [ akka.remote.RemoteTransportException: Startup failed at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:136) at akka.remote.Remoting.start(Remoting.scala:201) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632) at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1765) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1756) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.deploy.master.Master$.startSystemAndActor(Master.scala:849) at org.apache.spark.deploy.master.Master$.main(Master.scala:829) at org.apache.spark.deploy.master.Master.main(Master.scala) Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to: srnode1/10.0.0.5:7077 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) at scala.util.Try$.apply(Try.scala:161) at scala.util.Success.map(Try.scala:206) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) Can anyone help me? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/deploying-Spark-on-standalone-cluster-tp22049.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to avoid using some nodes while running a spark program on yarn
Hello, I am got a cluster with spark on yarn. Currently some nodes of it are running a spark streamming program, thus their local space is not enough to support other application. Thus I wonder is that possible to use a blacklist to avoid using these nodes when running a new spark program? Alcaid
Re: building all modules in spark by mvn
I can't reproduce that. 'mvn package' builds everything. You're not showing additional output from Maven that would explain what it skipped and why. On Sat, Mar 14, 2015 at 12:57 AM, sequoiadb mailing-list-r...@sequoiadb.com wrote: guys, is there any easier way to build all modules by mvn ? right now if I run “mvn package” in spark root directory I got: [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... SUCCESS [ 8.327 s] [INFO] Spark Project Networking ... SKIPPED [INFO] Spark Project Shuffle Streaming Service SKIPPED [INFO] Spark Project Core . SKIPPED [INFO] Spark Project Bagel SKIPPED [INFO] Spark Project GraphX ... SKIPPED [INFO] Spark Project Streaming SKIPPED [INFO] Spark Project Catalyst . SKIPPED [INFO] Spark Project SQL .. SKIPPED [INFO] Spark Project ML Library ... SKIPPED … Apprently only Parent project is built and all other children projects are skipped. I can get sparksql/stream projects built by sbt/sbt, but if I’d like to use mvn and do not want to build each dependent module separately, is there any good way to do it? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Streaming linear regression example question
Hi I try to understand example provided in https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - Streaming linear regression Code: import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream object StreamingLinReg { def main(args: Array[String]) { val conf = new SparkConf().setAppName(StreamLinReg).setMaster(local[2]) val ssc = new StreamingContext(conf, Seconds(10)) val trainingData = ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/training/).map(LabeledPoint.parse).cache() val testData = ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/testing/).map(LabeledPoint.parse) val numFeatures = 3 val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures)) model.trainOn(trainingData) model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination() } } Compiled code and run it Put file contains (1.0,[2.0,2.0,2.0]) (2.0,[3.0,3.0,3.0]) (3.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[6.0,6.0,6.0]) (6.0,[7.0,7.0,7.0]) (7.0,[8.0,8.0,8.0]) (8.0,[9.0,9.0,9.0]) (9.0,[10.0,10.0,10.0]) in to training directory. I can see that models weight change: 15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model: weights, [7.333,7.333,7.333] No I can put what ever in to testing directory but I can not understand answer. In example I can put the same file I used for training in to testing directory. File content is (1.0,[2.0,2.0,2.0]) (2.0,[3.0,3.0,3.0]) (3.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[6.0,6.0,6.0]) (6.0,[7.0,7.0,7.0]) (7.0,[8.0,8.0,8.0]) (8.0,[9.0,9.0,9.0]) (9.0,[10.0,10.0,10.0]) And answer will be (1.0,0.0) (2.0,0.0) (3.0,0.0) (4.0,0.0) (5.0,0.0) (6.0,0.0) (7.0,0.0) (8.0,0.0) (9.0,0.0) And in case my file content is (0.0,[2.0,2.0,2.0]) (0.0,[3.0,3.0,3.0]) (0.0,[4.0,4.0,4.0]) (0.0,[5.0,5.0,5.0]) (0.0,[6.0,6.0,6.0]) (0.0,[7.0,7.0,7.0]) (0.0,[8.0,8.0,8.0]) (0.0,[9.0,9.0,9.0]) (0.0,[10.0,10.0,10.0]) the answer will be: (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) I except to get label predicted by model. -- Margus (margusja) Roo http://margus.roo.ee skype: margusja +372 51 480
Please help me understand TF-IDF Vector structure
Hi, I read this document, http://spark.apache.org/docs/1.2.1/mllib-feature-extraction.html, and tried to build a TF-IDF model of my documents. I have a list of documents, each word is represented as a Int, and each document is listed in one line. doc_name, int1, int2... doc_name, int3, int4... This is how I load my documents: val documents: RDD[Seq[Int]] = sc.objectFile[(String, Seq[Int])](s$sparkStore/documents) map (_._2) cache() Then I did: val hashingTF = new HashingTF() val tf: RDD[Vector] = hashingTF.transform(documents) val idf = new IDF().fit(tf) val tfidf = idf.transform(tf) I write the tfidf model to a text file and try to understand the structure. FileUtils.writeLines(new File(tfidf.out), tfidf.collect().toList.asJavaCollection) What I is something like: (1048576,[0,4,7,8,10,13,17,21],[...some float numbers...]) ... I think it s a tuple with 3 element. - I have no idea what the 1st element is... - I think the 2nd element is a list of the word - I think the 3rd element is a list of tf-idf value of the words in the previous list Please help me understand this structure. Thanks, David
Re: spark there is no space on the disk
It means pretty much what it says. You ran out of space on an executor (not driver), because the dir used for serialization temp files is full (not all volumes). Set spark.local.dirs to something more appropriate and larger. On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com wrote: Hi I was running a logistic regression algorithm on a 8 nodes spark cluster, each node has 8 cores and 56 GB Ram (each node is running a windows system). And the spark installation driver has 1.9 TB capacity. The dataset I was training on are has around 40 million records with around 6600 features. But I always get this error during the training process: Py4JJavaError: An error occurred while calling o70.trainLogisticRegressionModelWithLBFGS. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3 in stage 3.0 (TID 2766, workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net): java.io.IOException: There is not enough space on the disk at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300) at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247) at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:243) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at
Re: How to avoid using some nodes while running a spark program on yarn
Which release of hadoop are you using ? Can you utilize node labels feature ? See YARN-2492 and YARN-796 Cheers On Sat, Mar 14, 2015 at 1:49 AM, James alcaid1...@gmail.com wrote: Hello, I am got a cluster with spark on yarn. Currently some nodes of it are running a spark streamming program, thus their local space is not enough to support other application. Thus I wonder is that possible to use a blacklist to avoid using these nodes when running a new spark program? Alcaid
Re: How to avoid using some nodes while running a spark program on yarn
My hadoop version is 2.2.0, and my spark version is 1.2.0 2015-03-14 17:22 GMT+08:00 Ted Yu yuzhih...@gmail.com: Which release of hadoop are you using ? Can you utilize node labels feature ? See YARN-2492 and YARN-796 Cheers On Sat, Mar 14, 2015 at 1:49 AM, James alcaid1...@gmail.com wrote: Hello, I am got a cluster with spark on yarn. Currently some nodes of it are running a spark streamming program, thus their local space is not enough to support other application. Thus I wonder is that possible to use a blacklist to avoid using these nodes when running a new spark program? Alcaid
Re: Please help me understand TF-IDF Vector structure
Hey, I work it out myself :) The Vector is actually a SparesVector, so when it is written into a string, the format is (size, [coordinate], [value...]) Simple! On Sat, Mar 14, 2015 at 6:05 PM Xi Shen davidshe...@gmail.com wrote: Hi, I read this document, http://spark.apache.org/docs/1.2.1/mllib-feature-extraction.html, and tried to build a TF-IDF model of my documents. I have a list of documents, each word is represented as a Int, and each document is listed in one line. doc_name, int1, int2... doc_name, int3, int4... This is how I load my documents: val documents: RDD[Seq[Int]] = sc.objectFile[(String, Seq[Int])](s$sparkStore/documents) map (_._2) cache() Then I did: val hashingTF = new HashingTF() val tf: RDD[Vector] = hashingTF.transform(documents) val idf = new IDF().fit(tf) val tfidf = idf.transform(tf) I write the tfidf model to a text file and try to understand the structure. FileUtils.writeLines(new File(tfidf.out), tfidf.collect().toList.asJavaCollection) What I is something like: (1048576,[0,4,7,8,10,13,17,21],[...some float numbers...]) ... I think it s a tuple with 3 element. - I have no idea what the 1st element is... - I think the 2nd element is a list of the word - I think the 3rd element is a list of tf-idf value of the words in the previous list Please help me understand this structure. Thanks, David
Re: deploying Spark on standalone cluster
Hi, You may want to check your spark environment config in spark-env.sh, specifically for the SPARK_LOCAL_IP and check that whether you did modify that value, which may default be localhost. Thanks, Sun. fightf...@163.com From: sara mustafa Date: 2015-03-14 15:13 To: user Subject: deploying Spark on standalone cluster Hi, I am trying to deploy spark on standalone cluster of two machines on for master node and one for worker node. i have defined the two machines in conf/slaves file and also i /etc/hosts, when i tried to run the cluster the worker node is running but the master node failed to run and throw this error: 15/03/14 07:05:04 ERROR Remoting: Remoting error: [Startup failed] [ akka.remote.RemoteTransportException: Startup failed at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:136) at akka.remote.Remoting.start(Remoting.scala:201) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632) at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1765) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1756) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.deploy.master.Master$.startSystemAndActor(Master.scala:849) at org.apache.spark.deploy.master.Master$.main(Master.scala:829) at org.apache.spark.deploy.master.Master.main(Master.scala) Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to: srnode1/10.0.0.5:7077 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) at scala.util.Try$.apply(Try.scala:161) at scala.util.Success.map(Try.scala:206) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) Can anyone help me? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/deploying-Spark-on-standalone-cluster-tp22049.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: deploying Spark on standalone cluster
Hi, You may want to check your spark environment config in spark-env.sh, specifically for the SPARK_LOCAL_IP and check that whether you did modify that value, which may default be localhost. Thanks, Sun. fightf...@163.com From: sara mustafa Date: 2015-03-14 15:13 To: user Subject: deploying Spark on standalone cluster Hi, I am trying to deploy spark on standalone cluster of two machines on for master node and one for worker node. i have defined the two machines in conf/slaves file and also i /etc/hosts, when i tried to run the cluster the worker node is running but the master node failed to run and throw this error: 15/03/14 07:05:04 ERROR Remoting: Remoting error: [Startup failed] [ akka.remote.RemoteTransportException: Startup failed at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:136) at akka.remote.Remoting.start(Remoting.scala:201) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632) at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1765) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1756) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.deploy.master.Master$.startSystemAndActor(Master.scala:849) at org.apache.spark.deploy.master.Master$.main(Master.scala:829) at org.apache.spark.deploy.master.Master.main(Master.scala) Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to: srnode1/10.0.0.5:7077 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) at scala.util.Try$.apply(Try.scala:161) at scala.util.Success.map(Try.scala:206) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) Can anyone help me? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/deploying-Spark-on-standalone-cluster-tp22049.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to avoid using some nodes while running a spark program on yarn
You won’t be able to use YARN labels on 2.2.0. However, you only need the labels if you want to map containers on specific hardware. In your scenario, the capacity scheduler in YARN might be the best bet. You can setup separate queues for the streaming and other jobs to protect a percentage of cluster resources. You can then spread all jobs across the cluster while protecting the streaming jobs’ capacity (if your resource containers sizes are granular enough). Simon On Mar 14, 2015, at 9:57 AM, James alcaid1...@gmail.com wrote: My hadoop version is 2.2.0, and my spark version is 1.2.0 2015-03-14 17:22 GMT+08:00 Ted Yu yuzhih...@gmail.com mailto:yuzhih...@gmail.com: Which release of hadoop are you using ? Can you utilize node labels feature ? See YARN-2492 and YARN-796 Cheers On Sat, Mar 14, 2015 at 1:49 AM, James alcaid1...@gmail.com mailto:alcaid1...@gmail.com wrote: Hello, I am got a cluster with spark on yarn. Currently some nodes of it are running a spark streamming program, thus their local space is not enough to support other application. Thus I wonder is that possible to use a blacklist to avoid using these nodes when running a new spark program? Alcaid
Re: serialization stakeoverflow error during reduce on nested objects
I haven't register my class in kryo but I dont think it would have such an impact on the stack size. I'm thinking of using graphx and I'm wondering how it serializes the graph object as it can use kryo as serializer. 2015-03-14 6:22 GMT+01:00 Ted Yu yuzhih...@gmail.com: Have you registered your class with kryo ? See core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala and core/src/test/scala/org/apache/spark/SparkConfSuite.scala On Fri, Mar 13, 2015 at 10:52 AM, ilaxes ila...@hotmail.com wrote: Hi, I'm working on a RDD of a tuple of objects which represent trees (Node containing a hashmap of nodes). I'm trying to aggregate these trees over the RDD. Let's take an example, 2 graphs : C - D - B - A - D - B - E F - E - B - A - D - B - F I'm spliting each graphs according to the vertex A resulting in : (B(1, D(1, C(1,))) , D(1, B(1, E(1,))) (B(1, E(1, F(1,))) , D(1, B(1, F(1,))) And I want to aggregate both graph getting : (B(2, (D(1, C(1,)), E(1, F(1, , D(2, B(2, (E(1,), F(1,))) Some graph are potentially large (+4000 vertex) but I'm not supposed to have any cyclic references. When I run my program I get this error : java.lang.StackOverflowError at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:127) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) I've tried to increase the size of the stake and use the standard java serializer but no effect. Any hint of the reason of this error and ways to change my code to solve it ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-stakeoverflow-error-during-reduce-on-nested-objects-tp22040.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using rdd methods with Dstream
Thanks TD, this is what I was looking for. rdd.context.makeRDD worked. Laeeq On Friday, March 13, 2015 11:08 PM, Tathagata Das t...@databricks.com wrote: Is the number of top K elements you want to keep small? That is, is K small? In which case, you can1. either do it in the driver on the array DStream.foreachRDD ( rdd = { val topK = rdd.top(K) ; // use top K }) 2. Or, you can use the topK to create another RDD using sc.makeRDD DStream.transform ( rdd = { val topK = rdd.top(K) ; rdd.context.makeRDD(topK, numPartitions)}) TD On Fri, Mar 13, 2015 at 5:58 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, Earlier my code was like follwing but slow due to repartition. I want top K of each window in a stream. val counts = keyAndValues.map(x = math.round(x._3.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4))val topCounts = counts.repartition(1).map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap).mapPartitions(rdd = rdd.take(10)) so I thought to use dstream.transform(rdd=rdd.top()) but this return Array rather than rdd. I have to perform further steps on topCounts dstream. [ERROR] found : Array[(Long, Long)][ERROR] required: org.apache.spark.rdd.RDD[?][ERROR] val topCounts = counts.transform(rdd = rdd.top(10)) Regards,Laeeq On Friday, March 13, 2015 1:47 PM, Sean Owen so...@cloudera.com wrote: Hm, aren't you able to use the SparkContext here? DStream operations happen on the driver. So you can parallelize() the result? take() won't work as it's not the same as top() On Fri, Mar 13, 2015 at 11:23 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Like this? dtream.repartition(1).mapPartitions(it = it.take(5)) Thanks Best Regards On Fri, Mar 13, 2015 at 4:11 PM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I normally use dstream.transform whenever I need to use methods which are available in RDD API but not in streaming API. e.g. dstream.transform(x = x.sortByKey(true)) But there are other RDD methods which return types other than RDD. e.g. dstream.transform(x = x.top(5)) top here returns Array. In the second scenario, how can i return RDD rather than array, so that i can perform further steps on dstream. Regards, Laeeq - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to avoid using some nodes while running a spark program on yarn
Out of curiosity, I searched for 'capacity scheduler deadlock' yielded the following: [YARN-3265] CapacityScheduler deadlock when computing absolute max avail capacity (fix for trunk/branch-2) [YARN-3251] Fix CapacityScheduler deadlock when computing absolute max avail capacity (short term fix for 2.6.1) YARN-2456 Possible livelock in CapacityScheduler when RM is recovering apps Looks like CapacityScheduler should get more stable in the upcoming hadoop 2.7.0 release. Cheers On Sat, Mar 14, 2015 at 4:25 AM, Simon Elliston Ball si...@simonellistonball.com wrote: You won’t be able to use YARN labels on 2.2.0. However, you only need the labels if you want to map containers on specific hardware. In your scenario, the capacity scheduler in YARN might be the best bet. You can setup separate queues for the streaming and other jobs to protect a percentage of cluster resources. You can then spread all jobs across the cluster while protecting the streaming jobs’ capacity (if your resource containers sizes are granular enough). Simon On Mar 14, 2015, at 9:57 AM, James alcaid1...@gmail.com wrote: My hadoop version is 2.2.0, and my spark version is 1.2.0 2015-03-14 17:22 GMT+08:00 Ted Yu yuzhih...@gmail.com: Which release of hadoop are you using ? Can you utilize node labels feature ? See YARN-2492 and YARN-796 Cheers On Sat, Mar 14, 2015 at 1:49 AM, James alcaid1...@gmail.com wrote: Hello, I am got a cluster with spark on yarn. Currently some nodes of it are running a spark streamming program, thus their local space is not enough to support other application. Thus I wonder is that possible to use a blacklist to avoid using these nodes when running a new spark program? Alcaid
Re: Spark Release 1.3.0 DataFrame API
Yes I think this was already just fixed by: https://github.com/apache/spark/pull/4977 a .toDF() is missing On Sat, Mar 14, 2015 at 4:16 PM, Nick Pentreath nick.pentre...@gmail.com wrote: I've found people.toDF gives you a data frame (roughly equivalent to the previous Row RDD), And you can then call registerTempTable on that DataFrame. So people.toDF.registerTempTable(people) should work — Sent from Mailbox On Sat, Mar 14, 2015 at 5:33 PM, David Mitchell jdavidmitch...@gmail.com wrote: I am pleased with the release of the DataFrame API. However, I started playing with it, and neither of the two main examples in the documentation work: http://spark.apache.org/docs/1.3.0/sql-programming-guide.html Specfically: Inferring the Schema Using Reflection Programmatically Specifying the Schema Scala 2.11.6 Spark 1.3.0 prebuilt for Hadoop 2.4 and later Inferring the Schema Using Reflection scala people.registerTempTable(people) console:31: error: value registerTempTable is not a member of org.apache.spark .rdd.RDD[Person] people.registerTempTable(people) ^ Programmatically Specifying the Schema scala val peopleDataFrame = sqlContext.createDataFrame(people, schema) console:41: error: overloaded method value createDataFrame with alternatives: (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spar k.sql.DataFrame and (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.Dat aFrame and (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],columns: java.util.List[String])org.apache.spark.sql.DataFrame and (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: o rg.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame and (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache .spark.sql.types.StructType)org.apache.spark.sql.DataFrame cannot be applied to (org.apache.spark.rdd.RDD[String], org.apache.spark.sql.ty pes.StructType) val df = sqlContext.createDataFrame(people, schema) Any help would be appreciated. David - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Need Advice about reading lots of text files
It’s a long story but there are many dirs with smallish part- files in them so we create a list of the individual files as input to sparkContext.textFile(fileList). I suppose we could move them and rename them to be contiguous part- files in one dir. Would that be better than passing in a long list of individual filenames? We could also make the part files much larger by collecting the smaller ones. But would any of this make a difference in IO speed? I ask because using the long file list seems to read, what amounts to a not very large data set rather slowly. If it were all in large part files in one dir I’d expect it to go much faster but this is just intuition. On Mar 14, 2015, at 9:58 AM, Koert Kuipers ko...@tresata.com wrote: why can you not put them in a directory and read them as one input? you will get a task per file, but spark is very fast at executing many tasks (its not a jvm per task). On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel p...@occamsmachete.com mailto:p...@occamsmachete.com wrote: Any advice on dealing with a large number of separate input files? On Mar 13, 2015, at 4:06 PM, Pat Ferrel p...@occamsmachete.com mailto:p...@occamsmachete.com wrote: We have many text files that we need to read in parallel. We can create a comma delimited list of files to pass in to sparkContext.textFile(fileList). The list can get very large (maybe 1) and is all on hdfs. The question is: what is the most performant way to read them? Should they be broken up and read in groups appending the resulting RDDs or should we just pass in the entire list at once? In effect I’m asking if Spark does some optimization of whether we should do it explicitly. If the later, what rule might we use depending on our cluster setup? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Spark Release 1.3.0 DataFrame API
I've found people.toDF gives you a data frame (roughly equivalent to the previous Row RDD), And you can then call registerTempTable on that DataFrame. So people.toDF.registerTempTable(people) should work — Sent from Mailbox On Sat, Mar 14, 2015 at 5:33 PM, David Mitchell jdavidmitch...@gmail.com wrote: I am pleased with the release of the DataFrame API. However, I started playing with it, and neither of the two main examples in the documentation work: http://spark.apache.org/docs/1.3.0/sql-programming-guide.html Specfically: - Inferring the Schema Using Reflection - Programmatically Specifying the Schema Scala 2.11.6 Spark 1.3.0 prebuilt for Hadoop 2.4 and later *Inferring the Schema Using Reflection* scala people.registerTempTable(people) console:31: error: value registerTempTable is not a member of org.apache.spark .rdd.RDD[Person] people.registerTempTable(people) ^ *Programmatically Specifying the Schema* scala val peopleDataFrame = sqlContext.createDataFrame(people, schema) console:41: error: overloaded method value createDataFrame with alternatives: (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spar k.sql.DataFrame and (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.Dat aFrame and (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],columns: java.util.List[String])org.apache.spark.sql.DataFrame and (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: o rg.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame and (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache .spark.sql.types.StructType)org.apache.spark.sql.DataFrame cannot be applied to (org.apache.spark.rdd.RDD[String], org.apache.spark.sql.ty pes.StructType) val df = sqlContext.createDataFrame(people, schema) Any help would be appreciated. David
Re: Need Advice about reading lots of text files
Any advice on dealing with a large number of separate input files? On Mar 13, 2015, at 4:06 PM, Pat Ferrel p...@occamsmachete.com wrote: We have many text files that we need to read in parallel. We can create a comma delimited list of files to pass in to sparkContext.textFile(fileList). The list can get very large (maybe 1) and is all on hdfs. The question is: what is the most performant way to read them? Should they be broken up and read in groups appending the resulting RDDs or should we just pass in the entire list at once? In effect I’m asking if Spark does some optimization of whether we should do it explicitly. If the later, what rule might we use depending on our cluster setup? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How does Spark honor data locality when allocating computing resources for an application
you seem like not to note the configuration varible spreadOutApps And it's comment: // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. 2015-03-14 10:41 GMT+08:00 bit1...@163.com bit1...@163.com: Hi, sparkers, When I read the code about computing resources allocation for the newly submitted application in the Master#schedule method, I got a question about data locality: // Pack each app into as few nodes as possible until we've assigned all its cores for (worker - workers if worker.coresFree 0 worker.state == WorkerState.ALIVE) { for (app - waitingApps if app.coresLeft 0) { if (canUse(app, worker)) { val coresToUse = math.min(worker.coresFree, app.coresLeft) if (coresToUse 0) { val exec = app.addExecutor(worker, coresToUse) launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } } } } Looks that the resource allocation policy here is that Master will assign as few workers as possible, so long as these few workers has enough resources for the application. My question is: Assume that the data the application will process is spread on all the worker nodes, then the data locality is lost if using the above policy? Not sure whether I have unstandood correctly or I have missed something. -- bit1...@163.com -- 王海华
Spark Release 1.3.0 DataFrame API
I am pleased with the release of the DataFrame API. However, I started playing with it, and neither of the two main examples in the documentation work: http://spark.apache.org/docs/1.3.0/sql-programming-guide.html Specfically: - Inferring the Schema Using Reflection - Programmatically Specifying the Schema Scala 2.11.6 Spark 1.3.0 prebuilt for Hadoop 2.4 and later *Inferring the Schema Using Reflection* scala people.registerTempTable(people) console:31: error: value registerTempTable is not a member of org.apache.spark .rdd.RDD[Person] people.registerTempTable(people) ^ *Programmatically Specifying the Schema* scala val peopleDataFrame = sqlContext.createDataFrame(people, schema) console:41: error: overloaded method value createDataFrame with alternatives: (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spar k.sql.DataFrame and (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.Dat aFrame and (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],columns: java.util.List[String])org.apache.spark.sql.DataFrame and (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: o rg.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame and (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache .spark.sql.types.StructType)org.apache.spark.sql.DataFrame cannot be applied to (org.apache.spark.rdd.RDD[String], org.apache.spark.sql.ty pes.StructType) val df = sqlContext.createDataFrame(people, schema) Any help would be appreciated. David
Pausing/throttling spark/spark-streaming application
Hi, I created a question on StackOverflow: http://stackoverflow.com/questions/29051579/pausing-throttling-spark-spark-streaming-application I would appreciate your help. Best, Tomek -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pausing-throttling-spark-spark-streaming-application-tp22050.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark and HBase join issue
Hi all, I have the following cluster configurations: - 5 nodes on a cloud environment. - Hadoop 2.5.0. - HBase 0.98.6. - Spark 1.2.0. - 8 cores and 16 GB of ram on each host. - 1 NFS disk with 300 IOPS mounted on host 1 and 2. - 1 NFS disk with 300 IOPS mounted on host 3,4 and 5. I tried to run a spark job in cluster mode that computes the left outer join between two hbase tables. The first table stores about 4.1 GB of data spread across 3 regions with Snappy compression. The second one stores about 1.2 GB of data spread across 22 regions with Snappy compression. I sometimes get executor lost during in the shuffle phase during the last stage (saveAsHadoopDataset). Below my spark conf: num-cpu-cores = 20 memory-per-node = 10G spark.scheduler.mode = FAIR spark.scheduler.pool = production spark.shuffle.spill= true spark.rdd.compress = true spark.core.connection.auth.wait.timeout=2000 spark.sql.shuffle.partitions=100 spark.default.parallelism=50 spark.speculation=false spark.shuffle.spill=true spark.shuffle.memoryFraction=0.1 spark.cores.max=30 spark.driver.memory=10g Are the resource to low to handle this kind of operation? if yes, could you share with me the right configuration to perform this kind of task? Thank you in advance. F.
Re: Spark and HBase join issue
The 4.1 GB table has 3 regions. This means that there would be at least 2 nodes which don't carry its region. Can you split this table into 12 (or more) regions ? BTW what's the value for spark.yarn.executor.memoryOverhead ? Cheers On Sat, Mar 14, 2015 at 10:52 AM, francexo83 francex...@gmail.com wrote: Hi all, I have the following cluster configurations: - 5 nodes on a cloud environment. - Hadoop 2.5.0. - HBase 0.98.6. - Spark 1.2.0. - 8 cores and 16 GB of ram on each host. - 1 NFS disk with 300 IOPS mounted on host 1 and 2. - 1 NFS disk with 300 IOPS mounted on host 3,4 and 5. I tried to run a spark job in cluster mode that computes the left outer join between two hbase tables. The first table stores about 4.1 GB of data spread across 3 regions with Snappy compression. The second one stores about 1.2 GB of data spread across 22 regions with Snappy compression. I sometimes get executor lost during in the shuffle phase during the last stage (saveAsHadoopDataset). Below my spark conf: num-cpu-cores = 20 memory-per-node = 10G spark.scheduler.mode = FAIR spark.scheduler.pool = production spark.shuffle.spill= true spark.rdd.compress = true spark.core.connection.auth.wait.timeout=2000 spark.sql.shuffle.partitions=100 spark.default.parallelism=50 spark.speculation=false spark.shuffle.spill=true spark.shuffle.memoryFraction=0.1 spark.cores.max=30 spark.driver.memory=10g Are the resource to low to handle this kind of operation? if yes, could you share with me the right configuration to perform this kind of task? Thank you in advance. F.
Re: Need Advice about reading lots of text files
Here is how I have dealt with many small text files (on s3 though this should generalize) in the past: http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E FromMichael Armbrust mich...@databricks.comSubjectRe: S3NativeFileSystem inefficient implementation when calling sc.textFileDateThu, 27 Nov 2014 03:20:14 GMT In the past I have worked around this problem by avoiding sc.textFile(). Instead I read the data directly inside of a Spark job. Basically, you start with an RDD where each entry is a file in S3 and then flatMap that with something that reads the files and returns the lines. Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe Using this class you can do something like: sc.parallelize(s3n://mybucket/file1 :: s3n://mybucket/file1 ... :: Nil).flatMap(new ReadLinesSafe(_)) You can also build up the list of files by running a Spark job:https://gist.github.com/marmbrus/15e72f7bc22337cf6653 Michael On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel p...@occamsmachete.com wrote: It’s a long story but there are many dirs with smallish part- files in them so we create a list of the individual files as input to sparkContext.textFile(fileList). I suppose we could move them and rename them to be contiguous part- files in one dir. Would that be better than passing in a long list of individual filenames? We could also make the part files much larger by collecting the smaller ones. But would any of this make a difference in IO speed? I ask because using the long file list seems to read, what amounts to a not very large data set rather slowly. If it were all in large part files in one dir I’d expect it to go much faster but this is just intuition. On Mar 14, 2015, at 9:58 AM, Koert Kuipers ko...@tresata.com wrote: why can you not put them in a directory and read them as one input? you will get a task per file, but spark is very fast at executing many tasks (its not a jvm per task). On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel p...@occamsmachete.com wrote: Any advice on dealing with a large number of separate input files? On Mar 13, 2015, at 4:06 PM, Pat Ferrel p...@occamsmachete.com wrote: We have many text files that we need to read in parallel. We can create a comma delimited list of files to pass in to sparkContext.textFile(fileList). The list can get very large (maybe 1) and is all on hdfs. The question is: what is the most performant way to read them? Should they be broken up and read in groups appending the resulting RDDs or should we just pass in the entire list at once? In effect I’m asking if Spark does some optimization of whether we should do it explicitly. If the later, what rule might we use depending on our cluster setup? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL 1.3 max operation giving wrong results
Do you have an example that reproduces the issue? On Fri, Mar 13, 2015 at 4:12 PM, gtinside gtins...@gmail.com wrote: Hi , I am playing around with Spark SQL 1.3 and noticed that max function does not give the correct result i.e doesn't give the maximum value. The same query works fine in Spark SQL 1.2 . Is any one aware of this issue ? Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-3-max-operation-giving-wrong-results-tp22043.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org