Re: Best way to tranform string label into long label for classification problem
Thank you Xinh. That's what I need. Le mar. 28 juin 2016 à 17:43, Xinh Huynh <xinh.hu...@gmail.com> a écrit : > Hi Jao, > > Here's one option: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > "StringIndexer encodes a string column of labels to a column of label > indices. The indices are in [0, numLabels), ordered by label frequencies." > > Xinh > > On Tue, Jun 28, 2016 at 12:29 AM, Jaonary Rabarisoa <jaon...@gmail.com> > wrote: > >> Dear all, >> >> I'm trying to a find a way to transform a DataFrame into a data that is >> more suitable for third party classification algorithm. The DataFrame have >> two columns : "feature" represented by a vector and "label" represented by >> a string. I want the "label" to be a number between [0, number of classes - >> 1]. >> Do you have any ideas to do it efficiently ? >> >> Cheers, >> >> Jao >> > >
Best way to tranform string label into long label for classification problem
Dear all, I'm trying to a find a way to transform a DataFrame into a data that is more suitable for third party classification algorithm. The DataFrame have two columns : "feature" represented by a vector and "label" represented by a string. I want the "label" to be a number between [0, number of classes - 1]. Do you have any ideas to do it efficiently ? Cheers, Jao
GMM with diagonal covariance matrix
Hi all, Is it possible to learn a gaussian mixture model with a diagonal covariance matrix in the GMM algorithm implemented in MLIb ? It seems to be possible but can't figure out how to do that. Cheers, Jao
ml.Pipeline without train step
Hi there, The Pipeline of ml package is really a great feature and we use it in our every day task. But we have some use case where we need a Pipeline of Transformers only and the problem is that there's not train phase in that case. For example, we have a pipeline of image analytics with the following step : ImageDecoder -> FeatureExtraction -> Maching Learning algo. Some times, the machine learning algorithms are not part of the pipeline but we still need the ImageDecoder + FeatureExtraction steps. The FeatureExtraction step has no train phase (FFT, SIFT, PreTrained CNN, ...) and it this case calling .fit method in order to get the pipeline model has no meaningful signification. How can we handle this case correctly ? Cheers, Jao
Why transformer from ml.Pipeline transform only a DataFrame ?
Hi there, The actual API of ml.Transformer use only DataFrame as input. I have a use case where I need to transform a single element. For example transforming an element from spark-streaming. Is there any reason for this or the ml.Transformer will support transforming a single element later ? Cheers, Jao
Re: Build k-NN graph for large dataset
Thank you all for these links. I'll check them. On Wed, Aug 26, 2015 at 5:05 PM, Charlie Hack charles.t.h...@gmail.com wrote: +1 to all of the above esp. Dimensionality reduction and locality sensitive hashing / min hashing. There's also an algorithm implemented in MLlib called DIMSUM which was developed at Twitter for this purpose. I've been meaning to try it and would be interested to hear about results you get. https://blog.twitter.com/2014/all-pairs-similarity-via-dimsum Charlie — Sent from Mailbox On Wednesday, Aug 26, 2015 at 09:57, Michael Malak michaelma...@yahoo.com.invalid, wrote: Yes. And a paper that describes using grids (actually varying grids) is http://research.microsoft.com/en-us/um/people/jingdw/pubs%5CCVPR12-GraphConstruction.pdf In the Spark GraphX In Action book that Robin East and I are writing, we implement a drastically simplified version of this in chapter 7, which should become available in the MEAP mid-September. http://www.manning.com/books/spark-graphx-in-action -- If you don't want to compute all N^2 similarities, you need to implement some kind of blocking first. For example, LSH (locally sensitive hashing). A quick search gave this link to a Spark implementation: http://stackoverflow.com/questions/2771/spark-implementation-for-locality-sensitive-hashing On Wed, Aug 26, 2015 at 7:35 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, I'm trying to find an efficient way to build a k-NN graph for a large dataset. Precisely, I have a large set of high dimensional vector (say d 1) and I want to build a graph where those high dimensional points are the vertices and each one is linked to the k-nearest neighbor based on some kind similarity defined on the vertex spaces. My problem is to implement an efficient algorithm to compute the weight matrix of the graph. I need to compute a N*N similarities and the only way I know is to use cartesian operation follow by map operation on RDD. But, this is very slow when the N is large. Is there a more cleaver way to do this for an arbitrary similarity function ? Cheers, Jao
Build k-NN graph for large dataset
Dear all, I'm trying to find an efficient way to build a k-NN graph for a large dataset. Precisely, I have a large set of high dimensional vector (say d 1) and I want to build a graph where those high dimensional points are the vertices and each one is linked to the k-nearest neighbor based on some kind similarity defined on the vertex spaces. My problem is to implement an efficient algorithm to compute the weight matrix of the graph. I need to compute a N*N similarities and the only way I know is to use cartesian operation follow by map operation on RDD. But, this is very slow when the N is large. Is there a more cleaver way to do this for an arbitrary similarity function ? Cheers, Jao
Re: SQL UserDefinedType can't be saved in parquet file when using assembly jar
In this example, every thing work expect save to parquet file. On Mon, May 11, 2015 at 4:39 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: MyDenseVectorUDT do exist in the assembly jar and in this example all the code is in a single file to make sure every thing is included. On Tue, Apr 21, 2015 at 1:17 AM, Xiangrui Meng men...@gmail.com wrote: You should check where MyDenseVectorUDT is defined and whether it was on the classpath (or in the assembly jar) at runtime. Make sure the full class name (with package name) is used. Btw, UDTs are not public yet, so please use it with caution. -Xiangrui On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Here is an example of code to reproduce the issue I mentioned in a previous mail about saving an UserDefinedType into a parquet file. The problem here is that the code works when I run it inside intellij idea but fails when I create the assembly jar and run it with spark-submit. I use the master version of Spark. @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT]) class MyDenseVector(val data: Array[Double]) extends Serializable { override def equals(other: Any): Boolean = other match { case v: MyDenseVector = java.util.Arrays.equals(this.data, v.data) case _ = false } } class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] { override def sqlType: DataType = ArrayType(DoubleType, containsNull = false) override def serialize(obj: Any): Seq[Double] = { obj match { case features: MyDenseVector = features.data.toSeq } } override def deserialize(datum: Any): MyDenseVector = { datum match { case data: Seq[_] = new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray) } } override def userClass: Class[MyDenseVector] = classOf[MyDenseVector] } case class Toto(imageAnnotation: MyDenseVector) object TestUserDefinedType { case class Params(input: String = null, partitions: Int = 12, outputDir: String = images.parquet) def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(ImportImageFolder).setMaster(local[4]) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val rawImages = sc.parallelize((1 to 5).map(x = Toto(new MyDenseVector(Array[Double](x.toDouble).toDF rawImages.printSchema() rawImages.show() rawImages.save(toto.parquet) // This fails with assembly jar sc.stop() } } My build.sbt is as follow : libraryDependencies ++= Seq( org.apache.spark %% spark-core % sparkVersion % provided, org.apache.spark %% spark-sql % sparkVersion, org.apache.spark %% spark-mllib % sparkVersion ) assemblyMergeStrategy in assembly := { case PathList(javax, servlet, xs @ _*) = MergeStrategy.first case PathList(org, apache, xs @ _*) = MergeStrategy.first case PathList(org, jboss, xs @ _*) = MergeStrategy.first // case PathList(ps @ _*) if ps.last endsWith .html = MergeStrategy.first // case application.conf= MergeStrategy.concat case m if m.startsWith(META-INF) = MergeStrategy.discard //case x = // val oldStrategy = (assemblyMergeStrategy in assembly).value // oldStrategy(x) case _ = MergeStrategy.first } As I said, this code works without problem when I execute it inside intellij idea. But when generate the assembly jar with sbt-assembly and use spark-submit I got the following error : 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0 (TID 7) java.lang.IllegalArgumentException: Unsupported dataType: {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]}, [1.1] failure: `TimestampType' expected but `{' found {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]} ^ at org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163) at org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) at scala.util.Try.getOrElse(Try.scala:77) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402) at org.apache.spark.sql.parquet.RowWriteSupport.init
Re: SQL UserDefinedType can't be saved in parquet file when using assembly jar
MyDenseVectorUDT do exist in the assembly jar and in this example all the code is in a single file to make sure every thing is included. On Tue, Apr 21, 2015 at 1:17 AM, Xiangrui Meng men...@gmail.com wrote: You should check where MyDenseVectorUDT is defined and whether it was on the classpath (or in the assembly jar) at runtime. Make sure the full class name (with package name) is used. Btw, UDTs are not public yet, so please use it with caution. -Xiangrui On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Here is an example of code to reproduce the issue I mentioned in a previous mail about saving an UserDefinedType into a parquet file. The problem here is that the code works when I run it inside intellij idea but fails when I create the assembly jar and run it with spark-submit. I use the master version of Spark. @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT]) class MyDenseVector(val data: Array[Double]) extends Serializable { override def equals(other: Any): Boolean = other match { case v: MyDenseVector = java.util.Arrays.equals(this.data, v.data) case _ = false } } class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] { override def sqlType: DataType = ArrayType(DoubleType, containsNull = false) override def serialize(obj: Any): Seq[Double] = { obj match { case features: MyDenseVector = features.data.toSeq } } override def deserialize(datum: Any): MyDenseVector = { datum match { case data: Seq[_] = new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray) } } override def userClass: Class[MyDenseVector] = classOf[MyDenseVector] } case class Toto(imageAnnotation: MyDenseVector) object TestUserDefinedType { case class Params(input: String = null, partitions: Int = 12, outputDir: String = images.parquet) def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(ImportImageFolder).setMaster(local[4]) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val rawImages = sc.parallelize((1 to 5).map(x = Toto(new MyDenseVector(Array[Double](x.toDouble).toDF rawImages.printSchema() rawImages.show() rawImages.save(toto.parquet) // This fails with assembly jar sc.stop() } } My build.sbt is as follow : libraryDependencies ++= Seq( org.apache.spark %% spark-core % sparkVersion % provided, org.apache.spark %% spark-sql % sparkVersion, org.apache.spark %% spark-mllib % sparkVersion ) assemblyMergeStrategy in assembly := { case PathList(javax, servlet, xs @ _*) = MergeStrategy.first case PathList(org, apache, xs @ _*) = MergeStrategy.first case PathList(org, jboss, xs @ _*) = MergeStrategy.first // case PathList(ps @ _*) if ps.last endsWith .html = MergeStrategy.first // case application.conf= MergeStrategy.concat case m if m.startsWith(META-INF) = MergeStrategy.discard //case x = // val oldStrategy = (assemblyMergeStrategy in assembly).value // oldStrategy(x) case _ = MergeStrategy.first } As I said, this code works without problem when I execute it inside intellij idea. But when generate the assembly jar with sbt-assembly and use spark-submit I got the following error : 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0 (TID 7) java.lang.IllegalArgumentException: Unsupported dataType: {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]}, [1.1] failure: `TimestampType' expected but `{' found {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]} ^ at org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163) at org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) at scala.util.Try.getOrElse(Try.scala:77) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402) at org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278
Problem with Spark SQL UserDefinedType and sbt assembly
Dear all, Here is an issue that gets me mad. I wrote a UserDefineType in order to be able to store a custom type in a parquet file. In my code I just create a DataFrame with my custom data type and write in into a parquet file. When I run my code directly inside idea every thing works like a charm. But when I create the assembly jar with sbt assembly and run the same code with spark-submit I get the following error : *15/04/16 17:02:17 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)* *java.lang.IllegalArgumentException: Unsupported dataType: {type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}, [1.1] failure: `TimestampType' expected but `{' found* *{type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}* *^* *at org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)* *at org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at scala.util.Try.getOrElse(Try.scala:77)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)* *at org.apache.spark.sql.parquet.ParquetRelation2.org http://org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:691)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)* *at org.apache.spark.scheduler.Task.run(Task.scala:64)* *at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:210)* *at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)* *at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)* *at java.lang.Thread.run(Thread.java:745)*
Re: Problem with Spark SQL UserDefinedType and sbt assembly
Any ideas ? On Thu, Apr 16, 2015 at 5:04 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Here is an issue that gets me mad. I wrote a UserDefineType in order to be able to store a custom type in a parquet file. In my code I just create a DataFrame with my custom data type and write in into a parquet file. When I run my code directly inside idea every thing works like a charm. But when I create the assembly jar with sbt assembly and run the same code with spark-submit I get the following error : *15/04/16 17:02:17 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)* *java.lang.IllegalArgumentException: Unsupported dataType: {type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}, [1.1] failure: `TimestampType' expected but `{' found* *{type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}* *^* *at org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)* *at org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at scala.util.Try.getOrElse(Try.scala:77)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)* *at org.apache.spark.sql.parquet.ParquetRelation2.org http://org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:691)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)* *at org.apache.spark.scheduler.Task.run(Task.scala:64)* *at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:210)* *at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)* *at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)* *at java.lang.Thread.run(Thread.java:745)*
Re: Problem with Spark SQL UserDefinedType and sbt assembly
Here is the list of my dependencies : *libraryDependencies ++= Seq(* * org.apache.spark %% spark-core % sparkVersion % provided, org.apache.spark %% spark-sql % sparkVersion, org.apache.spark %% spark-mllib % sparkVersion, org.iq80.leveldb % leveldb % 0.7, com.github.fommil.netlib % all % 1.1.2 pomOnly(), com.github.scopt %% scopt % 3.2.0, org.bytedeco.javacpp-presets % opencv % 2.4.11-0.11-SNAPSHOT classifier linux-x86_64 classifier , org.bytedeco.javacpp-presets % caffe % master-0.11-SNAPSHOT classifier linux-x86_64 classifier , org.bytedeco % javacpp % 0.11-SNAPSHOT, org.scalatest % scalatest_2.10 % 2.2.0 % test)* On Thu, Apr 16, 2015 at 11:16 PM, Richard Marscher rmarsc...@localytics.com wrote: If it fails with sbt-assembly but not without it, then there's always the likelihood of a classpath issue. What dependencies are you rolling up into your assembly jar? On Thu, Apr 16, 2015 at 4:46 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Any ideas ? On Thu, Apr 16, 2015 at 5:04 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Here is an issue that gets me mad. I wrote a UserDefineType in order to be able to store a custom type in a parquet file. In my code I just create a DataFrame with my custom data type and write in into a parquet file. When I run my code directly inside idea every thing works like a charm. But when I create the assembly jar with sbt assembly and run the same code with spark-submit I get the following error : *15/04/16 17:02:17 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)* *java.lang.IllegalArgumentException: Unsupported dataType: {type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}, [1.1] failure: `TimestampType' expected but `{' found* *{type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}* *^* *at org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)* *at org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at scala.util.Try.getOrElse(Try.scala:77)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)* *at org.apache.spark.sql.parquet.ParquetRelation2.org http://org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:691)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)* *at org.apache.spark.scheduler.Task.run(Task.scala:64)* *at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:210)* *at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)* *at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)* *at java.lang.Thread.run(Thread.java:745)*
How to get a clean DataFrame schema merge
Hi all, If you follow the example of schema merging in the spark documentation http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging you obtain the following results when you want to load the result data : single triple double 1 3 null 2 6 null 4 12 null 3 9 null 5 15 null 1 null 2 2 null 4 4 null 8 3 null 6 5 null 10 How to remove these null value and get something more logical like : single triple double 1 3 2 2 6 4 4 12 8 3 9 6 5 15 10 Bests, Jao
Re: How DataFrame schema migration works ?
I forgot to mention that the imageId field is a custom scala object. Do I need to implement some special method to make it works (equal, hashCode ) ? On Tue, Apr 14, 2015 at 5:00 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, In the latest version of spark there's a feature called : automatic partition discovery and Schema migration for parquet. As far as I know, this gives the ability to split the DataFrame into several parquet files, and by just loading the parent directory one can get the global schema of the parent DataFrame. I'm trying to use this feature in the following problem but I get some troubles. I want to perfom a serie of feature of extraction for a set of images. At a first step, my DataFrame has just two columns : imageId, imageRawData. Then I transform the imageRowData column with different image feature extractors. The result can be of different types. For example on feature could be a mllib.Vector, and another one could be an Array[Byte]. Each feature extractor store its output as a parquet file with two columns : imageId, featureType. Then, at the end, I get the following files : - features/rawData.parquet - features/feature1.parquet - features/feature2.parquet When I load all the features with : sqlContext.load(features) It seems to works and I get with this example a DataFrame with 4 columns : imageId, imageRawData, feature1, feature2. But, when I try to read the values, for example with show, some columns have null fields and I just can't figure out what's going wrong. Any ideas ? Best, Jao
How DataFrame schema migration works ?
Dear all, In the latest version of spark there's a feature called : automatic partition discovery and Schema migration for parquet. As far as I know, this gives the ability to split the DataFrame into several parquet files, and by just loading the parent directory one can get the global schema of the parent DataFrame. I'm trying to use this feature in the following problem but I get some troubles. I want to perfom a serie of feature of extraction for a set of images. At a first step, my DataFrame has just two columns : imageId, imageRawData. Then I transform the imageRowData column with different image feature extractors. The result can be of different types. For example on feature could be a mllib.Vector, and another one could be an Array[Byte]. Each feature extractor store its output as a parquet file with two columns : imageId, featureType. Then, at the end, I get the following files : - features/rawData.parquet - features/feature1.parquet - features/feature2.parquet When I load all the features with : sqlContext.load(features) It seems to works and I get with this example a DataFrame with 4 columns : imageId, imageRawData, feature1, feature2. But, when I try to read the values, for example with show, some columns have null fields and I just can't figure out what's going wrong. Any ideas ? Best, Jao
Re: Unable to save dataframe with UDT created with sqlContext.createDataFrame
Good! Thank you. On Thu, Apr 2, 2015 at 9:05 AM, Xiangrui Meng men...@gmail.com wrote: I reproduced the bug on master and submitted a patch for it: https://github.com/apache/spark/pull/5329. It may get into Spark 1.3.1. Thanks for reporting the bug! -Xiangrui On Wed, Apr 1, 2015 at 12:57 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hmm, I got the same error with the master. Here is another test example that fails. Here, I explicitly create a Row RDD which corresponds to the use case I am in : object TestDataFrame { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(TestDataFrame).setMaster(local[4]) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val data = Seq(LabeledPoint(1, Vectors.zeros(10))) val dataDF = sc.parallelize(data).toDF dataDF.printSchema() dataDF.save(test1.parquet) // OK val dataRow = data.map {case LabeledPoint(l: Double, f: mllib.linalg.Vector)= Row(l,f) } val dataRowRDD = sc.parallelize(dataRow) val dataDF2 = sqlContext.createDataFrame(dataRowRDD, dataDF.schema) dataDF2.printSchema() dataDF2.saveAsParquetFile(test3.parquet) // FAIL !!! } } On Tue, Mar 31, 2015 at 11:18 PM, Xiangrui Meng men...@gmail.com wrote: I cannot reproduce this error on master, but I'm not aware of any recent bug fixes that are related. Could you build and try the current master? -Xiangrui On Tue, Mar 31, 2015 at 4:10 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, DataFrame with an user defined type (here mllib.Vector) created with sqlContex.createDataFrame can't be saved to parquet file and raise ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to org.apache.spark.sql.Row error. Here is an example of code to reproduce this error : object TestDataFrame { def main(args: Array[String]): Unit = { //System.loadLibrary(Core.NATIVE_LIBRARY_NAME) val conf = new SparkConf().setAppName(RankingEval).setMaster(local[8]) .set(spark.executor.memory, 6g) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val data = sc.parallelize(Seq(LabeledPoint(1, Vectors.zeros(10 val dataDF = data.toDF dataDF.save(test1.parquet) val dataDF2 = sqlContext.createDataFrame(dataDF.rdd, dataDF.schema) dataDF2.save(test2.parquet) } } Is this related to https://issues.apache.org/jira/browse/SPARK-5532 and how can it be solved ? Cheers, Jao
Re: Unable to save dataframe with UDT created with sqlContext.createDataFrame
Hmm, I got the same error with the master. Here is another test example that fails. Here, I explicitly create a Row RDD which corresponds to the use case I am in : *object TestDataFrame { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(TestDataFrame).setMaster(local[4]) val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)* *import sqlContext.implicits._* *val data = Seq(LabeledPoint(1, Vectors.zeros(10)))val dataDF = sc.parallelize(data).toDFdataDF.printSchema() dataDF.save(test1.parquet) // OKval dataRow = data.map {case LabeledPoint(l: Double, f: mllib.linalg.Vector)= Row(l,f)} val dataRowRDD = sc.parallelize(dataRow)val dataDF2 = sqlContext.createDataFrame(dataRowRDD, dataDF.schema) dataDF2.printSchema()dataDF2.saveAsParquetFile(test3.parquet) // FAIL !!! }}* On Tue, Mar 31, 2015 at 11:18 PM, Xiangrui Meng men...@gmail.com wrote: I cannot reproduce this error on master, but I'm not aware of any recent bug fixes that are related. Could you build and try the current master? -Xiangrui On Tue, Mar 31, 2015 at 4:10 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, DataFrame with an user defined type (here mllib.Vector) created with sqlContex.createDataFrame can't be saved to parquet file and raise ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to org.apache.spark.sql.Row error. Here is an example of code to reproduce this error : object TestDataFrame { def main(args: Array[String]): Unit = { //System.loadLibrary(Core.NATIVE_LIBRARY_NAME) val conf = new SparkConf().setAppName(RankingEval).setMaster(local[8]) .set(spark.executor.memory, 6g) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val data = sc.parallelize(Seq(LabeledPoint(1, Vectors.zeros(10 val dataDF = data.toDF dataDF.save(test1.parquet) val dataDF2 = sqlContext.createDataFrame(dataDF.rdd, dataDF.schema) dataDF2.save(test2.parquet) } } Is this related to https://issues.apache.org/jira/browse/SPARK-5532 and how can it be solved ? Cheers, Jao
Re: Some questions after playing a little with the new ml.Pipeline.
Following your suggestion, I end up with the following implementation : *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = { val schema = transformSchema(dataSet.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap* *val features = dataSet.select(map(inputCol)).mapPartitions { rows = Caffe.set_mode(Caffe.CPU)val net = CaffeUtils.floatTestNetwork(SparkFiles.get(topology), SparkFiles.get(weight))val inputBlobs: FloatBlobVector = net.input_blobs()val N: Int = 1val K: Int = inputBlobs.get(0).channels()val H: Int = inputBlobs.get(0).height()val W: Int = inputBlobs.get(0).width() inputBlobs.get(0).Reshape(N, K, H, W)val dataBlob = new FloatPointer(N*K*W*H)* val inputCPUData = inputBlobs.get(0).mutable_cpu_data() val feat = rows.map { case Row(a: Iterable[Float])= dataBlob.put(a.toArray, 0, a.size) caffe_copy_float(N*K*W*H, dataBlob, inputCPUData) val resultBlobs: FloatBlobVector = net.ForwardPrefilled() * val resultDim = resultBlobs.get(0).channels() logInfo(sOutput dimension $resultDim) val resultBlobData = resultBlobs.get(0).cpu_data() val output = new Array[Float](resultDim) resultBlobData.get(output) Vectors.dense(output.map(_.toDouble))}//net.deallocate() feat } val newRowData = dataSet.rdd.zip(features).map { case (old, feat)=val oldSeq = old.toSeq Row.fromSeq(oldSeq :+ feat) } dataSet.sqlContext.createDataFrame(newRowData, schema)}* The idea is to mapPartitions of the underlying RDD of the DataFrame and create a new DataFrame by zipping the results. It seems to work but when I try to save the RDD I got the following error : org.apache.spark.mllib.linalg.DenseVector cannot be cast to org.apache.spark.sql.Row On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: One workaround could be to convert a DataFrame into a RDD inside the transform function and then use mapPartitions/broadcast to work with the JNI calls and then convert back to RDD. Thanks Shivaram On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, I'm still struggling to make a pre-trained caffe model transformer for dataframe works. The main problem is that creating a caffe model inside the UDF is very slow and consumes memories. Some of you suggest to broadcast the model. The problem with broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset and it is not serializable. Another possible approach is to use a UDF that can handle a whole partitions instead of just a row in order to minimize the caffe model instantiation. Is there any ideas to solve one of these two issues ? Best, Jao On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley jos...@databricks.com wrote: I see. I think your best bet is to create the cnnModel on the master and then serialize it to send to the workers. If it's big (1M or so), then you can broadcast it and use the broadcast variable in the UDF. There is not a great way to do something equivalent to mapPartitions with UDFs right now. On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Here is my current implementation with current master version of spark *class DeepCNNFeature extends Transformer with HasInputCol with HasOutputCol ... { override def transformSchema(...) { ... }* *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = {* * transformSchema(dataSet.schema, paramMap, logging = true)* * val map = this.paramMap ++ paramMap val deepCNNFeature = udf((v: Vector)= {* * val cnnModel = new CaffeModel * * cnnModel.transform(v)* * } : Vector ) dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol* * }* *}* where CaffeModel is a java api to Caffe C++ model. The problem here is that for every row it will create a new instance of CaffeModel which is inefficient since creating a new model means loading a large model file. And it will transform only a single row at a time. Or a Caffe network can process a batch of rows efficiently. In other words, is it possible to create an UDF that can operatats on a partition in order to minimize the creation of a CaffeModel and to take advantage of the Caffe network batch processing ? On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley jos...@databricks.com wrote: I see, thanks for clarifying! I'd recommend following existing implementations in spark.ml transformers. You'll need to define a UDF which operates on a single Row to compute the value for the new column. You can then use the DataFrame DSL to create the new column; the DSL provides a nice syntax for what would otherwise be a SQL statement like
Re: Some questions after playing a little with the new ml.Pipeline.
In my transformSchema I do specify that the output column type is a VectorUDT : *override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { val map = this.paramMap ++ paramMap checkInputColumn(schema, map(inputCol), ArrayType(FloatType, false)) addOutputColumn(schema, map(outputCol), new VectorUDT)}* The output of printSchema is as follow : *|-- cnnFeature: vecto (nullable = false)* On Tue, Mar 31, 2015 at 9:55 AM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: My guess is that the `createDataFrame` call is failing here. Can you check if the schema being passed to it includes the column name and type for the newly being zipped `features` ? Joseph probably knows this better, but AFAIK the DenseVector here will need to be marked as a VectorUDT while creating a DataFrame column Thanks Shivaram On Tue, Mar 31, 2015 at 12:50 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Following your suggestion, I end up with the following implementation : *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = { val schema = transformSchema(dataSet.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap* *val features = dataSet.select(map(inputCol)).mapPartitions { rows = Caffe.set_mode(Caffe.CPU)val net = CaffeUtils.floatTestNetwork(SparkFiles.get(topology), SparkFiles.get(weight))val inputBlobs: FloatBlobVector = net.input_blobs()val N: Int = 1val K: Int = inputBlobs.get(0).channels()val H: Int = inputBlobs.get(0).height() val W: Int = inputBlobs.get(0).width()inputBlobs.get(0).Reshape(N, K, H, W)val dataBlob = new FloatPointer(N*K*W*H)* val inputCPUData = inputBlobs.get(0).mutable_cpu_data() val feat = rows.map { case Row(a: Iterable[Float])= dataBlob.put(a.toArray, 0, a.size) caffe_copy_float(N*K*W*H, dataBlob, inputCPUData) val resultBlobs: FloatBlobVector = net.ForwardPrefilled() * val resultDim = resultBlobs.get(0).channels() logInfo(sOutput dimension $resultDim) val resultBlobData = resultBlobs.get(0).cpu_data() val output = new Array[Float](resultDim) resultBlobData.get(output) Vectors.dense(output.map(_.toDouble)) }//net.deallocate()feat } val newRowData = dataSet.rdd.zip(features).map { case (old, feat)=val oldSeq = old.toSeq Row.fromSeq(oldSeq :+ feat) } dataSet.sqlContext.createDataFrame(newRowData, schema)}* The idea is to mapPartitions of the underlying RDD of the DataFrame and create a new DataFrame by zipping the results. It seems to work but when I try to save the RDD I got the following error : org.apache.spark.mllib.linalg.DenseVector cannot be cast to org.apache.spark.sql.Row On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: One workaround could be to convert a DataFrame into a RDD inside the transform function and then use mapPartitions/broadcast to work with the JNI calls and then convert back to RDD. Thanks Shivaram On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, I'm still struggling to make a pre-trained caffe model transformer for dataframe works. The main problem is that creating a caffe model inside the UDF is very slow and consumes memories. Some of you suggest to broadcast the model. The problem with broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset and it is not serializable. Another possible approach is to use a UDF that can handle a whole partitions instead of just a row in order to minimize the caffe model instantiation. Is there any ideas to solve one of these two issues ? Best, Jao On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley jos...@databricks.com wrote: I see. I think your best bet is to create the cnnModel on the master and then serialize it to send to the workers. If it's big (1M or so), then you can broadcast it and use the broadcast variable in the UDF. There is not a great way to do something equivalent to mapPartitions with UDFs right now. On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Here is my current implementation with current master version of spark *class DeepCNNFeature extends Transformer with HasInputCol with HasOutputCol ... { override def transformSchema(...) { ... }* *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = {* * transformSchema(dataSet.schema, paramMap, logging = true)* * val map = this.paramMap ++ paramMap val deepCNNFeature = udf((v: Vector)= {* * val cnnModel = new CaffeModel * * cnnModel.transform(v)* * } : Vector ) dataSet.withColumn(map(outputCol
Unable to save dataframe with UDT created with sqlContext.createDataFrame
Hi all, DataFrame with an user defined type (here mllib.Vector) created with sqlContex.createDataFrame can't be saved to parquet file and raise ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to org.apache.spark.sql.Row error. Here is an example of code to reproduce this error : *object TestDataFrame { def main(args: Array[String]): Unit = { //System.loadLibrary(Core.NATIVE_LIBRARY_NAME)val conf = new SparkConf().setAppName(RankingEval).setMaster(local[8]) .set(spark.executor.memory, 6g)val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc)import sqlContext.implicits._val data = sc.parallelize(Seq(LabeledPoint(1, Vectors.zeros(10val dataDF = data.toDFdataDF.save(test1.parquet)val dataDF2 = sqlContext.createDataFrame(dataDF.rdd, dataDF.schema) dataDF2.save(test2.parquet) }}* Is this related to https://issues.apache.org/jira/browse/SPARK-5532 and how can it be solved ? Cheers, Jao
Re: Some questions after playing a little with the new ml.Pipeline.
Dear all, I'm still struggling to make a pre-trained caffe model transformer for dataframe works. The main problem is that creating a caffe model inside the UDF is very slow and consumes memories. Some of you suggest to broadcast the model. The problem with broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset and it is not serializable. Another possible approach is to use a UDF that can handle a whole partitions instead of just a row in order to minimize the caffe model instantiation. Is there any ideas to solve one of these two issues ? Best, Jao On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley jos...@databricks.com wrote: I see. I think your best bet is to create the cnnModel on the master and then serialize it to send to the workers. If it's big (1M or so), then you can broadcast it and use the broadcast variable in the UDF. There is not a great way to do something equivalent to mapPartitions with UDFs right now. On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Here is my current implementation with current master version of spark *class DeepCNNFeature extends Transformer with HasInputCol with HasOutputCol ... { override def transformSchema(...) { ... }* *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = {* * transformSchema(dataSet.schema, paramMap, logging = true)* * val map = this.paramMap ++ paramMap val deepCNNFeature = udf((v: Vector)= {* * val cnnModel = new CaffeModel * * cnnModel.transform(v)* * } : Vector ) dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol* * }* *}* where CaffeModel is a java api to Caffe C++ model. The problem here is that for every row it will create a new instance of CaffeModel which is inefficient since creating a new model means loading a large model file. And it will transform only a single row at a time. Or a Caffe network can process a batch of rows efficiently. In other words, is it possible to create an UDF that can operatats on a partition in order to minimize the creation of a CaffeModel and to take advantage of the Caffe network batch processing ? On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley jos...@databricks.com wrote: I see, thanks for clarifying! I'd recommend following existing implementations in spark.ml transformers. You'll need to define a UDF which operates on a single Row to compute the value for the new column. You can then use the DataFrame DSL to create the new column; the DSL provides a nice syntax for what would otherwise be a SQL statement like select ... from I'm recommending looking at the existing implementation (rather than stating it here) because it changes between Spark 1.2 and 1.3. In 1.3, the DSL is much improved and makes it easier to create a new column. Joseph On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: class DeepCNNFeature extends Transformer ... { override def transform(data: DataFrame, paramMap: ParamMap): DataFrame = { // How can I do a map partition on the underlying RDD and then add the column ? } } On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi Joseph, Thank your for the tips. I understand what should I do when my data are represented as a RDD. The thing that I can't figure out is how to do the same thing when the data is view as a DataFrame and I need to add the result of my pretrained model as a new column in the DataFrame. Preciselly, I want to implement the following transformer : class DeepCNNFeature extends Transformer ... { } On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley jos...@databricks.com wrote: Hi Jao, You can use external tools and libraries if they can be called from your Spark program or script (with appropriate conversion of data types, etc.). The best way to apply a pre-trained model to a dataset would be to call the model from within a closure, e.g.: myRDD.map { myDatum = preTrainedModel.predict(myDatum) } If your data is distributed in an RDD (myRDD), then the above call will distribute the computation of prediction using the pre-trained model. It will require that all of your Spark workers be able to run the preTrainedModel; that may mean installing Caffe and dependencies on all nodes in the compute cluster. For the second question, I would modify the above call as follows: myRDD.mapPartitions { myDataOnPartition = val myModel = // instantiate neural network on this partition myDataOnPartition.map { myDatum = myModel.predict(myDatum) } } I hope this helps! Joseph On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, We mainly do large scale computer vision task (image
Re: Solve least square problem of the form min norm(A x - b)^2^ + lambda * n * norm(x)^2 ?
It runs faster but there is some drawbacks. It seems to consume more memory. I get java.lang.OutOfMemoryError: Java heap space error if I don't have a sufficient partitions for a fixed amount of memory. With the older (ampcamp) implementation for the same data size I didn't get it. On Thu, Mar 12, 2015 at 11:36 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: On Thu, Mar 12, 2015 at 3:05 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: In fact, by activating netlib with native libraries it goes faster. Glad you got it work ! Better performance was one of the reasons we made the switch. Thanks On Tue, Mar 10, 2015 at 7:03 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: There are a couple of differences between the ml-matrix implementation and the one used in AMPCamp - I think the AMPCamp one uses JBLAS which tends to ship native BLAS libraries along with it. In ml-matrix we switched to using Breeze + Netlib BLAS which is faster but needs some setup [1] to pick up native libraries. If native libraries are not found it falls back to a JVM implementation, so that might explain the slow down. - The other difference if you are comparing the whole image pipeline is that I think the AMPCamp version used NormalEquations which is around 2-3x faster (just in terms of number of flops) compared to TSQR. [1] https://github.com/fommil/netlib-java#machine-optimised-system-libraries Thanks Shivaram On Tue, Mar 10, 2015 at 9:57 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: I'm trying to play with the implementation of least square solver (Ax = b) in mlmatrix.TSQR where A is a 5*1024 matrix and b a 5*10 matrix. It works but I notice that it's 8 times slower than the implementation given in the latest ampcamp : http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html . As far as I know these two implementations come from the same basis. What is the difference between these two codes ? On Tue, Mar 3, 2015 at 8:02 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: There are couple of solvers that I've written that is part of the AMPLab ml-matrix repo [1,2]. These aren't part of MLLib yet though and if you are interested in porting them I'd be happy to review it Thanks Shivaram [1] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/TSQR.scala [2] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/NormalEquations.scala On Tue, Mar 3, 2015 at 9:01 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Is there a least square solver based on DistributedMatrix that we can use out of the box in the current (or the master) version of spark ? It seems that the only least square solver available in spark is private to recommender package. Cheers, Jao
Re: Solve least square problem of the form min norm(A x - b)^2^ + lambda * n * norm(x)^2 ?
In fact, by activating netlib with native libraries it goes faster. Thanks On Tue, Mar 10, 2015 at 7:03 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: There are a couple of differences between the ml-matrix implementation and the one used in AMPCamp - I think the AMPCamp one uses JBLAS which tends to ship native BLAS libraries along with it. In ml-matrix we switched to using Breeze + Netlib BLAS which is faster but needs some setup [1] to pick up native libraries. If native libraries are not found it falls back to a JVM implementation, so that might explain the slow down. - The other difference if you are comparing the whole image pipeline is that I think the AMPCamp version used NormalEquations which is around 2-3x faster (just in terms of number of flops) compared to TSQR. [1] https://github.com/fommil/netlib-java#machine-optimised-system-libraries Thanks Shivaram On Tue, Mar 10, 2015 at 9:57 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: I'm trying to play with the implementation of least square solver (Ax = b) in mlmatrix.TSQR where A is a 5*1024 matrix and b a 5*10 matrix. It works but I notice that it's 8 times slower than the implementation given in the latest ampcamp : http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html . As far as I know these two implementations come from the same basis. What is the difference between these two codes ? On Tue, Mar 3, 2015 at 8:02 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: There are couple of solvers that I've written that is part of the AMPLab ml-matrix repo [1,2]. These aren't part of MLLib yet though and if you are interested in porting them I'd be happy to review it Thanks Shivaram [1] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/TSQR.scala [2] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/NormalEquations.scala On Tue, Mar 3, 2015 at 9:01 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Is there a least square solver based on DistributedMatrix that we can use out of the box in the current (or the master) version of spark ? It seems that the only least square solver available in spark is private to recommender package. Cheers, Jao
Re: Solve least square problem of the form min norm(A x - b)^2^ + lambda * n * norm(x)^2 ?
I'm trying to play with the implementation of least square solver (Ax = b) in mlmatrix.TSQR where A is a 5*1024 matrix and b a 5*10 matrix. It works but I notice that it's 8 times slower than the implementation given in the latest ampcamp : http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html . As far as I know these two implementations come from the same basis. What is the difference between these two codes ? On Tue, Mar 3, 2015 at 8:02 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: There are couple of solvers that I've written that is part of the AMPLab ml-matrix repo [1,2]. These aren't part of MLLib yet though and if you are interested in porting them I'd be happy to review it Thanks Shivaram [1] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/TSQR.scala [2] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/NormalEquations.scala On Tue, Mar 3, 2015 at 9:01 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Is there a least square solver based on DistributedMatrix that we can use out of the box in the current (or the master) version of spark ? It seems that the only least square solver available in spark is private to recommender package. Cheers, Jao
Re: Solve least square problem of the form min norm(A x - b)^2^ + lambda * n * norm(x)^2 ?
Do you have a reference paper to the implemented algorithm in TSQR.scala ? On Tue, Mar 3, 2015 at 8:02 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: There are couple of solvers that I've written that is part of the AMPLab ml-matrix repo [1,2]. These aren't part of MLLib yet though and if you are interested in porting them I'd be happy to review it Thanks Shivaram [1] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/TSQR.scala [2] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/NormalEquations.scala On Tue, Mar 3, 2015 at 9:01 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Is there a least square solver based on DistributedMatrix that we can use out of the box in the current (or the master) version of spark ? It seems that the only least square solver available in spark is private to recommender package. Cheers, Jao
Re: Data Frame types
Hi Cesar, Yes, you can define an UDT with the new DataFrame, the same way that SchemaRDD did. Jaonary On Fri, Mar 6, 2015 at 4:22 PM, Cesar Flores ces...@gmail.com wrote: The SchemaRDD supports the storage of user defined classes. However, in order to do that, the user class needs to extends the UserDefinedType interface (see for example VectorUDT in org.apache.spark.mllib.linalg). My question is: Do the new Data Frame Structure (to be released in spark 1.3) will be able to handle user defined classes too? Do user classes will need to extend they will need to define the same approach? -- Cesar Flores
Re: Solve least square problem of the form min norm(A x - b)^2^ + lambda * n * norm(x)^2 ?
Hi Shivaram, Thank you for the link. I'm trying to figure out how can I port this to mllib. May you can help me to understand how pieces fit together. Currently, in mllib there's different types of distributed matrix : BlockMatrix, CoordinateMatrix, IndexedRowMatrix and RowMatrix. Which one should correspond to RowPartitionedMatrix in ml-matrix ? On Tue, Mar 3, 2015 at 8:02 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: There are couple of solvers that I've written that is part of the AMPLab ml-matrix repo [1,2]. These aren't part of MLLib yet though and if you are interested in porting them I'd be happy to review it Thanks Shivaram [1] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/TSQR.scala [2] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/NormalEquations.scala On Tue, Mar 3, 2015 at 9:01 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Is there a least square solver based on DistributedMatrix that we can use out of the box in the current (or the master) version of spark ? It seems that the only least square solver available in spark is private to recommender package. Cheers, Jao
Re: Some questions after playing a little with the new ml.Pipeline.
Here is my current implementation with current master version of spark *class DeepCNNFeature extends Transformer with HasInputCol with HasOutputCol ... { override def transformSchema(...) { ... }* *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = {* * transformSchema(dataSet.schema, paramMap, logging = true)* * val map = this.paramMap ++ paramMap val deepCNNFeature = udf((v: Vector)= {* * val cnnModel = new CaffeModel * * cnnModel.transform(v)* * } : Vector ) dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol* * }* *}* where CaffeModel is a java api to Caffe C++ model. The problem here is that for every row it will create a new instance of CaffeModel which is inefficient since creating a new model means loading a large model file. And it will transform only a single row at a time. Or a Caffe network can process a batch of rows efficiently. In other words, is it possible to create an UDF that can operatats on a partition in order to minimize the creation of a CaffeModel and to take advantage of the Caffe network batch processing ? On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley jos...@databricks.com wrote: I see, thanks for clarifying! I'd recommend following existing implementations in spark.ml transformers. You'll need to define a UDF which operates on a single Row to compute the value for the new column. You can then use the DataFrame DSL to create the new column; the DSL provides a nice syntax for what would otherwise be a SQL statement like select ... from I'm recommending looking at the existing implementation (rather than stating it here) because it changes between Spark 1.2 and 1.3. In 1.3, the DSL is much improved and makes it easier to create a new column. Joseph On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: class DeepCNNFeature extends Transformer ... { override def transform(data: DataFrame, paramMap: ParamMap): DataFrame = { // How can I do a map partition on the underlying RDD and then add the column ? } } On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi Joseph, Thank your for the tips. I understand what should I do when my data are represented as a RDD. The thing that I can't figure out is how to do the same thing when the data is view as a DataFrame and I need to add the result of my pretrained model as a new column in the DataFrame. Preciselly, I want to implement the following transformer : class DeepCNNFeature extends Transformer ... { } On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley jos...@databricks.com wrote: Hi Jao, You can use external tools and libraries if they can be called from your Spark program or script (with appropriate conversion of data types, etc.). The best way to apply a pre-trained model to a dataset would be to call the model from within a closure, e.g.: myRDD.map { myDatum = preTrainedModel.predict(myDatum) } If your data is distributed in an RDD (myRDD), then the above call will distribute the computation of prediction using the pre-trained model. It will require that all of your Spark workers be able to run the preTrainedModel; that may mean installing Caffe and dependencies on all nodes in the compute cluster. For the second question, I would modify the above call as follows: myRDD.mapPartitions { myDataOnPartition = val myModel = // instantiate neural network on this partition myDataOnPartition.map { myDatum = myModel.predict(myDatum) } } I hope this helps! Joseph On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, We mainly do large scale computer vision task (image classification, retrieval, ...). The pipeline is really great stuff for that. We're trying to reproduce the tutorial given on that topic during the latest spark summit ( http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html ) using the master version of spark pipeline and dataframe. The tutorial shows different examples of feature extraction stages before running machine learning algorithms. Even the tutorial is straightforward to reproduce with this new API, we still have some questions : - Can one use external tools (e.g via pipe) as a pipeline stage ? An example of use case is to extract feature learned with convolutional neural network. In our case, this corresponds to a pre-trained neural network with Caffe library (http://caffe.berkeleyvision.org/) . - The second question is about the performance of the pipeline. Library such as Caffe processes the data in batch and instancing one Caffe network can be time consuming when this network is very deep. So, we can gain performance if we minimize the number of Caffe network creation
Solve least square problem of the form min norm(A x - b)^2^ + lambda * n * norm(x)^2 ?
Dear all, Is there a least square solver based on DistributedMatrix that we can use out of the box in the current (or the master) version of spark ? It seems that the only least square solver available in spark is private to recommender package. Cheers, Jao
Re: Some questions after playing a little with the new ml.Pipeline.
class DeepCNNFeature extends Transformer ... { override def transform(data: DataFrame, paramMap: ParamMap): DataFrame = { // How can I do a map partition on the underlying RDD and then add the column ? } } On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi Joseph, Thank your for the tips. I understand what should I do when my data are represented as a RDD. The thing that I can't figure out is how to do the same thing when the data is view as a DataFrame and I need to add the result of my pretrained model as a new column in the DataFrame. Preciselly, I want to implement the following transformer : class DeepCNNFeature extends Transformer ... { } On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley jos...@databricks.com wrote: Hi Jao, You can use external tools and libraries if they can be called from your Spark program or script (with appropriate conversion of data types, etc.). The best way to apply a pre-trained model to a dataset would be to call the model from within a closure, e.g.: myRDD.map { myDatum = preTrainedModel.predict(myDatum) } If your data is distributed in an RDD (myRDD), then the above call will distribute the computation of prediction using the pre-trained model. It will require that all of your Spark workers be able to run the preTrainedModel; that may mean installing Caffe and dependencies on all nodes in the compute cluster. For the second question, I would modify the above call as follows: myRDD.mapPartitions { myDataOnPartition = val myModel = // instantiate neural network on this partition myDataOnPartition.map { myDatum = myModel.predict(myDatum) } } I hope this helps! Joseph On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, We mainly do large scale computer vision task (image classification, retrieval, ...). The pipeline is really great stuff for that. We're trying to reproduce the tutorial given on that topic during the latest spark summit ( http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html ) using the master version of spark pipeline and dataframe. The tutorial shows different examples of feature extraction stages before running machine learning algorithms. Even the tutorial is straightforward to reproduce with this new API, we still have some questions : - Can one use external tools (e.g via pipe) as a pipeline stage ? An example of use case is to extract feature learned with convolutional neural network. In our case, this corresponds to a pre-trained neural network with Caffe library (http://caffe.berkeleyvision.org/) . - The second question is about the performance of the pipeline. Library such as Caffe processes the data in batch and instancing one Caffe network can be time consuming when this network is very deep. So, we can gain performance if we minimize the number of Caffe network creation and give data in batch to the network. In the pipeline, this corresponds to run transformers that work on a partition basis and give the whole partition to a single caffe network. How can we create such a transformer ? Best, Jao
Re: Some questions after playing a little with the new ml.Pipeline.
Hi Joseph, Thank your for the tips. I understand what should I do when my data are represented as a RDD. The thing that I can't figure out is how to do the same thing when the data is view as a DataFrame and I need to add the result of my pretrained model as a new column in the DataFrame. Preciselly, I want to implement the following transformer : class DeepCNNFeature extends Transformer ... { } On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley jos...@databricks.com wrote: Hi Jao, You can use external tools and libraries if they can be called from your Spark program or script (with appropriate conversion of data types, etc.). The best way to apply a pre-trained model to a dataset would be to call the model from within a closure, e.g.: myRDD.map { myDatum = preTrainedModel.predict(myDatum) } If your data is distributed in an RDD (myRDD), then the above call will distribute the computation of prediction using the pre-trained model. It will require that all of your Spark workers be able to run the preTrainedModel; that may mean installing Caffe and dependencies on all nodes in the compute cluster. For the second question, I would modify the above call as follows: myRDD.mapPartitions { myDataOnPartition = val myModel = // instantiate neural network on this partition myDataOnPartition.map { myDatum = myModel.predict(myDatum) } } I hope this helps! Joseph On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, We mainly do large scale computer vision task (image classification, retrieval, ...). The pipeline is really great stuff for that. We're trying to reproduce the tutorial given on that topic during the latest spark summit ( http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html ) using the master version of spark pipeline and dataframe. The tutorial shows different examples of feature extraction stages before running machine learning algorithms. Even the tutorial is straightforward to reproduce with this new API, we still have some questions : - Can one use external tools (e.g via pipe) as a pipeline stage ? An example of use case is to extract feature learned with convolutional neural network. In our case, this corresponds to a pre-trained neural network with Caffe library (http://caffe.berkeleyvision.org/) . - The second question is about the performance of the pipeline. Library such as Caffe processes the data in batch and instancing one Caffe network can be time consuming when this network is very deep. So, we can gain performance if we minimize the number of Caffe network creation and give data in batch to the network. In the pipeline, this corresponds to run transformers that work on a partition basis and give the whole partition to a single caffe network. How can we create such a transformer ? Best, Jao
Some questions after playing a little with the new ml.Pipeline.
Dear all, We mainly do large scale computer vision task (image classification, retrieval, ...). The pipeline is really great stuff for that. We're trying to reproduce the tutorial given on that topic during the latest spark summit ( http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html ) using the master version of spark pipeline and dataframe. The tutorial shows different examples of feature extraction stages before running machine learning algorithms. Even the tutorial is straightforward to reproduce with this new API, we still have some questions : - Can one use external tools (e.g via pipe) as a pipeline stage ? An example of use case is to extract feature learned with convolutional neural network. In our case, this corresponds to a pre-trained neural network with Caffe library (http://caffe.berkeleyvision.org/) . - The second question is about the performance of the pipeline. Library such as Caffe processes the data in batch and instancing one Caffe network can be time consuming when this network is very deep. So, we can gain performance if we minimize the number of Caffe network creation and give data in batch to the network. In the pipeline, this corresponds to run transformers that work on a partition basis and give the whole partition to a single caffe network. How can we create such a transformer ? Best, Jao
Re: [ML][SQL] Select UserDefinedType attribute in a DataFrame
By doing so, I got the following error : Exception in thread main org.apache.spark.sql.AnalysisException: GetField is not valid on fields Seems that it doesn't like image.data expression. On Wed, Feb 25, 2015 at 12:37 AM, Xiangrui Meng men...@gmail.com wrote: Btw, the correct syntax for alias should be `df.select($image.data.as(features))`. On Tue, Feb 24, 2015 at 3:35 PM, Xiangrui Meng men...@gmail.com wrote: If you make `Image` a case class, then select(image.data) should work. On Tue, Feb 24, 2015 at 3:06 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I have a DataFrame that contains a user defined type. The type is an image with the following attribute class Image(w: Int, h: Int, data: Vector) In my DataFrame, images are stored in column named image that corresponds to the following case class case class LabeledImage(label: Int, image: Image) How can I select image.data attribute of my image object and view it as a column of a DataFrame ? I'd like to do something like val featureDF = imagesDF.select(image.data).as(features) Cheers, Jao
[ML][SQL] Select UserDefinedType attribute in a DataFrame
Hi all, I have a DataFrame that contains a user defined type. The type is an image with the following attribute *class Image(w: Int, h: Int, data: Vector)* In my DataFrame, images are stored in column named image that corresponds to the following case class *case class LabeledImage(label: Int, image: Image)* How can I select image.data attribute of my image object and view it as a column of a DataFrame ? I'd like to do something like *val featureDF = imagesDF.select(image.data).as(features)* Cheers, Jao
Re: Need some help to create user defined type for ML pipeline
Hi Joseph, Thank you for you feedback. I've managed to define an image type by following VectorUDT implementation. I have another question about the definition of a user defined transformer. The unary tranfromer is private to spark ml. Do you plan to give a developer api for transformers ? On Sun, Jan 25, 2015 at 2:26 AM, Joseph Bradley jos...@databricks.com wrote: Hi Jao, You're right that defining serialize and deserialize is the main task in implementing a UDT. They are basically translating between your native representation (ByteImage) and SQL DataTypes. The sqlType you defined looks correct, and you're correct to use a row of length 4. Other than that, it should just require copying data to and from SQL Rows. There are quite a few examples of that in the codebase; I'd recommend searching based on the particular DataTypes you're using. Are there particular issues you're running into? Joseph On Mon, Jan 19, 2015 at 12:59 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to implement a pipeline for computer vision based on the latest ML package in spark. The first step of my pipeline is to decode image (jpeg for instance) stored in a parquet file. For this, I begin to create a UserDefinedType that represents a decoded image stored in a array of byte. Here is my first attempt : *@SQLUserDefinedType(udt = classOf[ByteImageUDT])class ByteImage(channels: Int, width: Int, height: Int, data: Array[Byte])private[spark] class ByteImageUDT extends UserDefinedType[ByteImage] { override def sqlType: StructType = {// type: 0 = sparse, 1 = dense// We only use values for dense vectors, and size, indices, and values for sparse// vectors. The values field is nullable because we might want to add binary vectors later,// which uses size and indices, but not values. StructType(Seq( StructField(channels, IntegerType, nullable = false), StructField(width, IntegerType, nullable = false), StructField(height, IntegerType, nullable = false), StructField(data, BinaryType, nullable = false) } override def serialize(obj: Any): Row = {val row = new GenericMutableRow(4)val img = obj.asInstanceOf[ByteImage]* *... } override def deserialize(datum: Any): Vector = { * ** *} } override def pyUDT: String = pyspark.mllib.linalg.VectorUDT override def userClass: Class[Vector] = classOf[Vector]}* I take the VectorUDT as a starting point but there's a lot of thing that I don't really understand. So any help on defining serialize and deserialize methods will be appreciated. Best Regards, Jao
Unable to run spark-shell after build
Hi all, I'm trying to run the master version of spark in order to test some alpha components in ml package. I follow the build spark documentation and build it with : $ mvn clean package The build is successful but when I try to run spark-shell I got the following errror : *Exception in thread main java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse* *at org.apache.spark.HttpServer.org http://org.apache.spark.HttpServer.org$apache$spark$HttpServer$$doStart(HttpServer.scala:74)* *at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:61)* *at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:61)* *at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1732)* *at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)* *at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1723)* *at org.apache.spark.HttpServer.start(HttpServer.scala:61)* *at org.apache.spark.repl.SparkIMain.init(SparkIMain.scala:130)* *at org.apache.spark.repl.SparkILoop$SparkILoopInterpreter.init(SparkILoop.scala:185)* *at org.apache.spark.repl.SparkILoop.createInterpreter(SparkILoop.scala:214)* *at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:946)* *at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:942)* *at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:942)* *at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)* *at org.apache.spark.repl.SparkILoop.org http://org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:942)* *at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1039)* *at org.apache.spark.repl.Main$.main(Main.scala:31)* *at org.apache.spark.repl.Main.main(Main.scala)* *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* *at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:606)* *at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:403)* *at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:77)* *at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)* *Caused by: java.lang.ClassNotFoundException: javax.servlet.http.HttpServletResponse* *at java.net.URLClassLoader$1.run(URLClassLoader.java:366)* *at java.net.URLClassLoader$1.run(URLClassLoader.java:355)* *at java.security.AccessController.doPrivileged(Native Method)* *at java.net.URLClassLoader.findClass(URLClassLoader.java:354)* *at java.lang.ClassLoader.loadClass(ClassLoader.java:425)* *at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)* *at java.lang.ClassLoader.loadClass(ClassLoader.java:358)* *... 25 more* What's going wrong ? Jao
Re: Can't find spark-parent when using snapshot build
That's what I did. On Mon, Feb 2, 2015 at 11:28 PM, Sean Owen so...@cloudera.com wrote: Snapshot builds are not published. Unless you build and install snapshots locally (like with mvn install) they wont be found. On Feb 2, 2015 10:58 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to use the master version of spark. I build and install it with $ mvn clean clean install I manage to use it with the following configuration in my build.sbt : *libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.3.0-SNAPSHOT % provided, org.apache.spark %% spark-sql % 1.3.0-SNAPSHOT % provided , org.apache.spark %% spark-mllib % 1.3.0-SNAPSHOT % provided)* But After my last update I got the following error : *unresolved dependency: org.apache.spark#spark-mllib_2.10;1.3.0-SNAPSHOT: Maven2 Local: no ivy file nor artifact found for org.apache.spark#spark-parent;1.3.0-SNAPSHOT* Any ideas ? Cheers, Jao
Can't find spark-parent when using snapshot build
Hi all, I'm trying to use the master version of spark. I build and install it with $ mvn clean clean install I manage to use it with the following configuration in my build.sbt : *libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.3.0-SNAPSHOT % provided, org.apache.spark %% spark-sql % 1.3.0-SNAPSHOT % provided , org.apache.spark %% spark-mllib % 1.3.0-SNAPSHOT % provided)* But After my last update I got the following error : *unresolved dependency: org.apache.spark#spark-mllib_2.10;1.3.0-SNAPSHOT: Maven2 Local: no ivy file nor artifact found for org.apache.spark#spark-parent;1.3.0-SNAPSHOT* Any ideas ? Cheers, Jao
Need some help to create user defined type for ML pipeline
Hi all, I'm trying to implement a pipeline for computer vision based on the latest ML package in spark. The first step of my pipeline is to decode image (jpeg for instance) stored in a parquet file. For this, I begin to create a UserDefinedType that represents a decoded image stored in a array of byte. Here is my first attempt : *@SQLUserDefinedType(udt = classOf[ByteImageUDT])class ByteImage(channels: Int, width: Int, height: Int, data: Array[Byte])private[spark] class ByteImageUDT extends UserDefinedType[ByteImage] { override def sqlType: StructType = { // type: 0 = sparse, 1 = dense// We only use values for dense vectors, and size, indices, and values for sparse// vectors. The values field is nullable because we might want to add binary vectors later,// which uses size and indices, but not values.StructType(Seq( StructField(channels, IntegerType, nullable = false), StructField(width, IntegerType, nullable = false), StructField(height, IntegerType, nullable = false), StructField(data, BinaryType, nullable = false) } override def serialize(obj: Any): Row = {val row = new GenericMutableRow(4) val img = obj.asInstanceOf[ByteImage]* *... } override def deserialize(datum: Any): Vector = { * ** *} } override def pyUDT: String = pyspark.mllib.linalg.VectorUDT override def userClass: Class[Vector] = classOf[Vector]}* I take the VectorUDT as a starting point but there's a lot of thing that I don't really understand. So any help on defining serialize and deserialize methods will be appreciated. Best Regards, Jao
Re: DeepLearning and Spark ?
Can someone explain what is the difference between parameter server and spark ? There's already an issue on this topic https://issues.apache.org/jira/browse/SPARK-4590 Another example of DL in Spark essentially based on downpour SDG http://deepdist.com On Sat, Jan 10, 2015 at 2:27 AM, Peng Cheng rhw...@gmail.com wrote: Not if broadcast can only be used between stages. To enable this you have to at least make broadcast asynchronous non-blocking. On 9 January 2015 at 18:02, Krishna Sankar ksanka...@gmail.com wrote: I am also looking at this domain. We could potentially use the broadcast capability in Spark to distribute the parameters. Haven't thought thru yet. Cheers k/ On Fri, Jan 9, 2015 at 2:56 PM, Andrei faithlessfri...@gmail.com wrote: Does it makes sense to use Spark's actor system (e.g. via SparkContext.env.actorSystem) to create parameter server? On Fri, Jan 9, 2015 at 10:09 PM, Peng Cheng rhw...@gmail.com wrote: You are not the first :) probably not the fifth to have the question. parameter server is not included in spark framework and I've seen all kinds of hacking to improvise it: REST api, HDFS, tachyon, etc. Not sure if an 'official' benchmark implementation will be released soon On 9 January 2015 at 10:59, Marco Shaw marco.s...@gmail.com wrote: Pretty vague on details: http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199 On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, DeepLearning algorithms are popular and achieve many state of the art performance in several real world machine learning problems. Currently there are no DL implementation in spark and I wonder if there is an ongoing work on this topics. We can do DL in spark Sparkling water and H2O but this adds an additional software stack. Deeplearning4j seems to implements a distributed version of many popural DL algorithm. Porting DL4j in Spark can be interesting. Google describes an implementation of a large scale DL in this paper http://research.google.com/archive/large_deep_networks_nips2012.html. Based on model parallelism and data parallelism. So, I'm trying to imaging what should be a good design for DL algorithm in Spark ? Spark already have RDD (for data parallelism). Can GraphX be used for the model parallelism (as DNN are generally designed as DAG) ? And what about using GPUs to do local parallelism (mecanism to push partition into GPU memory ) ? What do you think about this ? Cheers, Jao
DeepLearning and Spark ?
Hi all, DeepLearning algorithms are popular and achieve many state of the art performance in several real world machine learning problems. Currently there are no DL implementation in spark and I wonder if there is an ongoing work on this topics. We can do DL in spark Sparkling water and H2O but this adds an additional software stack. Deeplearning4j seems to implements a distributed version of many popural DL algorithm. Porting DL4j in Spark can be interesting. Google describes an implementation of a large scale DL in this paper http://research.google.com/archive/large_deep_networks_nips2012.html. Based on model parallelism and data parallelism. So, I'm trying to imaging what should be a good design for DL algorithm in Spark ? Spark already have RDD (for data parallelism). Can GraphX be used for the model parallelism (as DNN are generally designed as DAG) ? And what about using GPUs to do local parallelism (mecanism to push partition into GPU memory ) ? What do you think about this ? Cheers, Jao
Re: MLLib: Saving and loading a model
Hi, There's is a ongoing work on model export https://www.github.com/apache/spark/pull/3062 For now, since LinearRegression is serializable you can save it as object file : sc.saveAsObjectFile(Seq(model)) then val model = sc.objectFile[LinearRegresionWithSGD](path).first model.predict(...) On Mon, Dec 15, 2014 at 11:21 PM, Sameer Tilak ssti...@live.com wrote: Hi All, Resending this: I am using LinearRegressionWithSGD and then I save the model weights and intercept. File that contains weights have this format: 1.20455 0.1356 0.000456 .. Intercept is 0 since I am using train not setting the intercept so it can be ignored for the moment. I would now like to initialize a new model object and using these saved weights from the above file. We are using CDH 5.1 Something along these lines: val weights = sc.textFile(linear-weights); val model = new LinearRegressionWithSGD(weights); then use is as: val valuesAndPreds = testData.map { point = val prediction = model.predict(point.features) (point.label, prediction) } Any pointers to how do I do that?
Re: Why KMeans with mllib is so slow ?
I've tried some additional experiments with kmeans and I finally got it worked as I expected. In fact, the number of partition is critical. I had a data set of 24x784 with 12 partitions. In this case the kmeans algorithm took a very long time (about hours to converge). When I change the partition into 32, the same kmeans ( runs = 10, k = 10, iterations = 300, init = kmeans|| ) converges in 4 min with 8 cores As a comparison, the same problem solve with python scikit-learn takes 21 min on a single core. So spark wins :) As conclusion, setting the number of partition correctly is essential. Is there a rule of thumb for that ? On Mon, Dec 15, 2014 at 8:55 PM, Xiangrui Meng men...@gmail.com wrote: Please check the number of partitions after sc.textFile. Use sc.textFile('...', 8) to have at least 8 partitions. -Xiangrui On Tue, Dec 9, 2014 at 4:58 AM, DB Tsai dbt...@dbtsai.com wrote: You just need to use the latest master code without any configuration to get performance improvement from my PR. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Mon, Dec 8, 2014 at 7:53 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: After some investigation, I learned that I can't compare kmeans in mllib with another kmeans implementation directly. The kmeans|| initialization step takes more time than the algorithm implemented in julia for example. There is also the ability to run multiple runs of kmeans algorithm in mllib even by default the number of runs is 1. DB Tsai can you please tell me the configuration you took for the improvement you mention in your pull request. I'd like to run the same benchmark on mnist8m on my computer. Cheers; On Fri, Dec 5, 2014 at 10:34 PM, DB Tsai dbt...@dbtsai.com wrote: Also, are you using the latest master in this experiment? A PR merged into the master couple days ago will spend up the k-means three times. See https://github.com/apache/spark/commit/7fc49ed91168999d24ae7b4cc46fbb4ec87febc1 Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 5, 2014 at 9:36 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: The code is really simple : object TestKMeans { def main(args: Array[String]) { val conf = new SparkConf() .setAppName(Test KMeans) .setMaster(local[8]) .set(spark.executor.memory, 8g) val sc = new SparkContext(conf) val numClusters = 500; val numIterations = 2; val data = sc.textFile(sample.csv).map(x = Vectors.dense(x.split(',').map(_.toDouble))) data.cache() val clusters = KMeans.train(data, numClusters, numIterations) println(clusters.clusterCenters.size) val wssse = clusters.computeCost(data) println(serror : $wssse) } } For the testing purpose, I was generating a sample random data with julia and store it in a csv file delimited by comma. The dimensions is 248000 x 384. In the target application, I will have more than 248k data to cluster. On Fri, Dec 5, 2014 at 6:03 PM, Davies Liu dav...@databricks.com wrote: Could you post you script to reproduce the results (also how to generate the dataset)? That will help us to investigate it. On Fri, Dec 5, 2014 at 8:40 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hmm, here I use spark on local mode on my laptop with 8 cores. The data is on my local filesystem. Event thought, there an overhead due to the distributed computation, I found the difference between the runtime of the two implementations really, really huge. Is there a benchmark on how well the algorithm implemented in mllib performs ? On Fri, Dec 5, 2014 at 4:56 PM, Sean Owen so...@cloudera.com wrote: Spark has much more overhead, since it's set up to distribute the computation. Julia isn't distributed, and so has no such overhead in a completely in-core implementation. You generally use Spark when you have a problem large enough to warrant distributing, or, your data already lives in a distributed store like HDFS. But it's also possible you're not configuring the implementations the same way, yes. There's not enough info here really to say. On Fri, Dec 5, 2014 at 9:50 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to a run clustering with kmeans algorithm. The size of my data set is about 240k vectors of dimension 384. Solving the problem with the kmeans available in julia (kmean++) http://clusteringjl.readthedocs.org/en/latest/kmeans.html take about 8
DIMSUM and ColumnSimilarity use case ?
Dear all, I'm trying to understand what is the correct use case of ColumnSimilarity implemented in RowMatrix. As far as I know, this function computes the similarity of a column of a given matrix. The DIMSUM paper says that it's efficient for large m (rows) and small n (columns). In this case the output will be a n by n matrix. Now, suppose I want to compute similarity of several users, say m = billions. Each users is described by a high dimensional feature vector, say n = 1. In my dataset, one row represent one user. So in that case computing the similarity my matrix is not the same as computing the similarity of all users. Then, what does it mean computing the similarity of the columns of my matrix in this case ? Best regards, Jao
Re: Mllib native netlib-java/OpenBLAS
+1 with 1.3-SNAPSHOT. On Mon, Dec 1, 2014 at 5:49 PM, agg212 alexander_galaka...@brown.edu wrote: Thanks for your reply, but I'm still running into issues installing/configuring the native libraries for MLlib. Here are the steps I've taken, please let me know if anything is incorrect. - Download Spark source - unzip and compile using `mvn -Pnetlib-lgpl -DskipTests clean package ` - Run `sbt/sbt publish-local` The last step fails with the following error (full stack trace is attached here: error.txt http://apache-spark-user-list.1001560.n3.nabble.com/file/n20110/error.txt ): [error] (sql/compile:compile) java.lang.AssertionError: assertion failed: List(object package$DebugNode, object package$DebugNode) Do I still have to install OPENBLAS/anything else if I build Spark from the source using the -Pnetlib-lgpl flag? Also, do I change the Spark version (from 1.1.0 to 1.2.0-SNAPSHOT) in the .sbt file for my app? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-tp19662p20110.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why my default partition size is set to 52 ?
Hi all, I'm trying to run some spark job with spark-shell. What I want to do is just to count the number of lines in a file. I start the spark-shell with the default argument i.e just with ./bin/spark-shell. Load the text file with sc.textFile(path) and then call count on my data. When I do this, my data is always split in 52 partitions. I don't understand why since I run it on a local machine with 8 cores and the sc.defaultParallelism gives me 8. Even, if I load the file with sc.textFile(path,8), I always get data.partitions.size = 52 I use spark 1.1.1. Any ideas ? Cheers, Jao
Re: Why my default partition size is set to 52 ?
Ok, I misunderstood the meaning of the partition. In fact, my file is 1.7G big and with less bigger file I have a different partitions size. Thanks for this clarification. On Fri, Dec 5, 2014 at 4:15 PM, Sean Owen so...@cloudera.com wrote: How big is your file? it's probably of a size that the Hadoop InputFormat would make 52 splits for it. Data drives partitions, not processing resource. Really, 8 splits is the minimum parallelism you want. Several times your # of cores is better. On Fri, Dec 5, 2014 at 8:51 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to run some spark job with spark-shell. What I want to do is just to count the number of lines in a file. I start the spark-shell with the default argument i.e just with ./bin/spark-shell. Load the text file with sc.textFile(path) and then call count on my data. When I do this, my data is always split in 52 partitions. I don't understand why since I run it on a local machine with 8 cores and the sc.defaultParallelism gives me 8. Even, if I load the file with sc.textFile(path,8), I always get data.partitions.size = 52 I use spark 1.1.1. Any ideas ? Cheers, Jao
Why KMeans with mllib is so slow ?
Hi all, I'm trying to a run clustering with kmeans algorithm. The size of my data set is about 240k vectors of dimension 384. Solving the problem with the kmeans available in julia (kmean++) http://clusteringjl.readthedocs.org/en/latest/kmeans.html take about 8 minutes on a single core. Solving the same problem with spark kmean|| take more than 1.5 hours with 8 cores Either they don't implement the same algorithm either I don't understand how the kmeans in spark works. Is my data not big enough to take full advantage of spark ? At least, I expect to the same runtime. Cheers, Jao
Re: Why KMeans with mllib is so slow ?
The code is really simple : *object TestKMeans {* * def main(args: Array[String]) {* *val conf = new SparkConf()* * .setAppName(Test KMeans)* * .setMaster(local[8])* * .set(spark.executor.memory, 8g)* *val sc = new SparkContext(conf)* *val numClusters = 500;* *val numIterations = 2;* *val data = sc.textFile(sample.csv).map(x = Vectors.dense(x.split(',').map(_.toDouble)))* *data.cache()* *val clusters = KMeans.train(data, numClusters, numIterations)* *println(clusters.clusterCenters.size)* *val wssse = clusters.computeCost(data)* *println(serror : $wssse)* * }* *}* For the testing purpose, I was generating a sample random data with julia and store it in a csv file delimited by comma. The dimensions is 248000 x 384. In the target application, I will have more than 248k data to cluster. On Fri, Dec 5, 2014 at 6:03 PM, Davies Liu dav...@databricks.com wrote: Could you post you script to reproduce the results (also how to generate the dataset)? That will help us to investigate it. On Fri, Dec 5, 2014 at 8:40 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hmm, here I use spark on local mode on my laptop with 8 cores. The data is on my local filesystem. Event thought, there an overhead due to the distributed computation, I found the difference between the runtime of the two implementations really, really huge. Is there a benchmark on how well the algorithm implemented in mllib performs ? On Fri, Dec 5, 2014 at 4:56 PM, Sean Owen so...@cloudera.com wrote: Spark has much more overhead, since it's set up to distribute the computation. Julia isn't distributed, and so has no such overhead in a completely in-core implementation. You generally use Spark when you have a problem large enough to warrant distributing, or, your data already lives in a distributed store like HDFS. But it's also possible you're not configuring the implementations the same way, yes. There's not enough info here really to say. On Fri, Dec 5, 2014 at 9:50 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to a run clustering with kmeans algorithm. The size of my data set is about 240k vectors of dimension 384. Solving the problem with the kmeans available in julia (kmean++) http://clusteringjl.readthedocs.org/en/latest/kmeans.html take about 8 minutes on a single core. Solving the same problem with spark kmean|| take more than 1.5 hours with 8 cores Either they don't implement the same algorithm either I don't understand how the kmeans in spark works. Is my data not big enough to take full advantage of spark ? At least, I expect to the same runtime. Cheers, Jao
Store kmeans model
Dear all, How can one save a kmeans model after training ? Best, Jao
Got java.lang.SecurityException: class javax.servlet.FilterRegistration's when running job from intellij Idea
Hi all, I have a spark job that I build with sbt and I can run without any problem with sbt run. But when I run it inside IntelliJ Idea I got the following error : *Exception encountered when invoking run on a nested suite - class javax.servlet.FilterRegistration's signer information does not match signer information of other classes in the same package* *java.lang.SecurityException: class javax.servlet.FilterRegistration's signer information does not match signer information of other classes in the same package* * at java.lang.ClassLoader.checkCerts(ClassLoader.java:952)* * at java.lang.ClassLoader.preDefineClass(ClassLoader.java:666)* * at java.lang.ClassLoader.defineClass(ClassLoader.java:794)* * at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)* * at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)* * at java.net.URLClassLoader.access$100(URLClassLoader.java:71)* * at java.net.URLClassLoader$1.run(URLClassLoader.java:361)* * at java.net.URLClassLoader$1.run(URLClassLoader.java:355)* * at java.security.AccessController.doPrivileged(Native Method)* * at java.net.URLClassLoader.findClass(URLClassLoader.java:354)* * at java.lang.ClassLoader.loadClass(ClassLoader.java:425)* * at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)* * at java.lang.ClassLoader.loadClass(ClassLoader.java:358)* * at org.eclipse.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:136)* * at org.eclipse.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:129)* * at org.eclipse.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:98)* * at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:98)* * at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:89)* * at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:67)* * at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:60)* * at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:60)* * at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)* * at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)* * at org.apache.spark.ui.WebUI.attachTab(WebUI.scala:60)* * at org.apache.spark.ui.SparkUI.initialize(SparkUI.scala:66)* * at org.apache.spark.ui.SparkUI.init(SparkUI.scala:60)* * at org.apache.spark.ui.SparkUI.init(SparkUI.scala:42)* * at org.apache.spark.SparkContext.init(SparkContext.scala:223)* * at org.apache.spark.SparkContext.init(SparkContext.scala:98)* How can I solve this ? Cheers, Jao
Re: unable to make a custom class as a key in a pairrdd
In the documentation it's said that we need to override the hashCode and equals methods. Without overriding it does't work too. I get this error on REPL and stand alone application On Fri, Oct 24, 2014 at 3:29 AM, Prashant Sharma scrapco...@gmail.com wrote: Are you doing this in REPL ? Then there is a bug filed for this, I just can't recall the bug ID at the moment. Prashant Sharma On Fri, Oct 24, 2014 at 4:07 AM, Niklas Wilcke 1wil...@informatik.uni-hamburg.de wrote: Hi Jao, I don't really know why this doesn't work but I have two hints. You don't need to override hashCode and equals. The modifier case is doing that for you. Writing case class PersonID(id: String) would be enough to get the class you want I think. If I change the type of the id param to Int it works for me but I don't know why. case class PersonID(id: Int) Looks like a strange behavior to me. Have a try. Good luck, Niklas On 23.10.2014 21:52, Jaonary Rabarisoa wrote: Hi all, I have the following case class that I want to use as a key in a key-value rdd. I defined the equals and hashCode methode but it's not working. What I'm doing wrong ? *case class PersonID(id: String) {* * override def hashCode = id.hashCode* * override def equals(other: Any) = other match {* * case that: PersonID = this.id http://this.id == that.id http://that.id this.getClass == that.getClass* * case _ = false* * } * * } * * val p = sc.parallelize((1 until 10).map(x = (PersonID(1),x )))* *p.groupByKey.collect foreach println* *(PersonID(1),CompactBuffer(5))* *(PersonID(1),CompactBuffer(6))* *(PersonID(1),CompactBuffer(7))* *(PersonID(1),CompactBuffer(8, 9))* *(PersonID(1),CompactBuffer(1))* *(PersonID(1),CompactBuffer(2))* *(PersonID(1),CompactBuffer(3))* *(PersonID(1),CompactBuffer(4))* Best, Jao
key class requirement for PairedRDD ?
Dear all, Is it possible to use any kind of object as key in a PairedRDD. When I use a case class key, the groupByKey operation don't behave as I expected. I want to use a case class to avoid using a large tuple as it is easier to manipulate. Cheers, Jaonary
Re: key class requirement for PairedRDD ?
Here what I'm trying to do. My case class is the following : case class PersonID(id: String, group: String, name: String) I want to use PersonID as a key in a PairedRDD. But I think the default equal function don't fit to my need because two PersonID(a,a,a) are not the same. When I use a tuple (String, String, String) as a key it's working. On Fri, Oct 17, 2014 at 9:03 AM, Sonal Goyal sonalgoy...@gmail.com wrote: We use our custom classes which are Serializable and have well defined hashcode and equals methods through the Java API. Whats the issue you are getting? Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Fri, Oct 17, 2014 at 12:28 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Is it possible to use any kind of object as key in a PairedRDD. When I use a case class key, the groupByKey operation don't behave as I expected. I want to use a case class to avoid using a large tuple as it is easier to manipulate. Cheers, Jaonary
Optimizing pairwise similarity computation or how to avoid RDD.cartesian operation ?
Hi all, I need to compute a similiarity between elements of two large sets of high dimensional feature vector. Naively, I create all possible pair of vectors with * features1.cartesian(features2)* and then map the produced paired rdd with my similarity function. The problem is that the cartesian operation takes a lot times, more time that computing the similarity itself. If I save each of my feature vector into disk, form a list of file name pair and compute the similarity by reading the files it runs significantly much faster. Any ideas will be helpful, Cheers, Jao
Re: Optimizing pairwise similarity computation or how to avoid RDD.cartesian operation ?
Hi Reza, Thank you for the suggestion. The number of point are not that large about 1000 for each set. So I have 1000x1000 pairs. But, my similarity is obtained using a metric learning to rank and from spark it is viewed as a black box. So my idea was just to distribute the computation of the 1000x1000 similarities. After some investigation, I managed to make it run faster. My feature vectors are obtained after a join operation and I didn't cache the result of this operation before the cartesian operation. Caching the result of the join operation make my code runs amazingly faster. So I think, the real problem I have is the lack of good practice on spark programming. Best Jao On Fri, Oct 17, 2014 at 11:08 PM, Reza Zadeh r...@databricks.com wrote: Hi Jaonary, What are the numbers, i.e. number of points you're trying to do all-pairs on, and the dimension of each? Have you tried the new implementation of columnSimilarities in RowMatrix? Setting the threshold high enough (potentially above 1.0) might solve your problem, here is an example https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala . This implements the DIMSUM sampling scheme, recently merged into master https://github.com/apache/spark/pull/1778. Best, Reza On Fri, Oct 17, 2014 at 3:43 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I need to compute a similiarity between elements of two large sets of high dimensional feature vector. Naively, I create all possible pair of vectors with * features1.cartesian(features2)* and then map the produced paired rdd with my similarity function. The problem is that the cartesian operation takes a lot times, more time that computing the similarity itself. If I save each of my feature vector into disk, form a list of file name pair and compute the similarity by reading the files it runs significantly much faster. Any ideas will be helpful, Cheers, Jao
Re: Interactive interface tool for spark
And what about Hue http://gethue.com ? On Sun, Oct 12, 2014 at 1:26 PM, andy petrella andy.petre...@gmail.com wrote: Dear Sparkers, As promised, I've just updated the repo with a new name (for the sake of clarity), default branch but specially with a dedicated README containing: * explanations on how to launch and use it * an intro on each feature like Spark, Classpaths, SQL, Dynamic update, ... * pictures showing results There is a notebook for each feature, so it's easier to try out! Here is the repo: https://github.com/andypetrella/spark-notebook/ HTH and PRs are more than welcome ;-). aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Wed, Oct 8, 2014 at 4:57 PM, Michael Allman mich...@videoamp.com wrote: Hi Andy, This sounds awesome. Please keep us posted. Meanwhile, can you share a link to your project? I wasn't able to find it. Cheers, Michael On Oct 8, 2014, at 3:38 AM, andy petrella andy.petre...@gmail.com wrote: Heya You can check Zeppellin or my fork of the Scala notebook. I'm going this week end to push some efforts on the doc, because it supports for realtime graphing, Scala, SQL, dynamic loading of dependencies and I started this morning a widget to track the progress of the jobs. I'm quite happy with it so far, I used it with graphx, mllib, ADAM and the Cassandra connector so far. However, its major drawback is that it is a one man (best) effort ftm! :-S Le 8 oct. 2014 11:16, Dai, Kevin yun...@ebay.com a écrit : Hi, All We need an interactive interface tool for spark in which we can run spark job and plot graph to explorer the data interactively. Ipython notebook is good, but it only support python (we want one supporting scala)… BR, Kevin.
java.lang.OutOfMemoryError: Java heap space when running job via spark-submit
Dear all, I have a spark job with the following configuration *val conf = new SparkConf()* * .setAppName(My Job)* * .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)* * .set(spark.kryo.registrator, value.serializer.Registrator)* * .setMaster(local[4])* * .set(spark.executor.memory, 4g)* that I can run manually with sbt run without any problem. But, I try to run the same job with spark-submit *./spark-1.1.0-bin-hadoop2.4/bin/spark-submit \* * --class value.jobs.MyJob \* * --master local[4] \* * --conf spark.executor.memory=4g \* * --conf spark.driver.memory=2g \* * target/scala-2.10/my-job_2.10-1.0.jar* I get the following error : *Exception in thread stdin writer for List(patch_matching_similarity) java.lang.OutOfMemoryError: Java heap space* * at java.util.Arrays.copyOf(Arrays.java:2271)* * at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)* * at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)* * at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)* * at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)* * at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)* * at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)* * at com.esotericsoftware.krput.writeString_slow(Output.java:420)* * at com.esotericsoftware.kryo.io.Output.writeString(Output.java:326)* * at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153)* * at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146)* * at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)* * at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)* * at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)* * at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)* * at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119)* * at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)* * at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1047)* * at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1056)* * at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:93)* * at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:745)* * at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:625)* * at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:167)* * at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)* * at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)* * at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)* * at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)* * at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)* * at org.apache.spark.rdd.CartesianRDD$$anonfun$compute$1.apply(CartesianRDD.scala:75)* * at org.apache.spark.rdd.CartesianRDD$$anonfun$compute$1.apply(CartesianRDD.scala:74)* * at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)* yo.io.Output.require(Output.java:135) at com.esotericsoftware.kryo.io.*Out* I don't understand why since I set the same amount of memory in the two cases. Any ideas will be helpfull. I use spark 1.1.0. Cheers, Jao
Re: java.lang.OutOfMemoryError: Java heap space when running job via spark-submit
in fact with --driver-memory 2G I can get it working On Thu, Oct 9, 2014 at 6:20 PM, Xiangrui Meng men...@gmail.com wrote: Please use --driver-memory 2g instead of --conf spark.driver.memory=2g. I'm not sure whether this is a bug. -Xiangrui On Thu, Oct 9, 2014 at 9:00 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, I have a spark job with the following configuration val conf = new SparkConf() .setAppName(My Job) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, value.serializer.Registrator) .setMaster(local[4]) .set(spark.executor.memory, 4g) that I can run manually with sbt run without any problem. But, I try to run the same job with spark-submit ./spark-1.1.0-bin-hadoop2.4/bin/spark-submit \ --class value.jobs.MyJob \ --master local[4] \ --conf spark.executor.memory=4g \ --conf spark.driver.memory=2g \ target/scala-2.10/my-job_2.10-1.0.jar I get the following error : Exception in thread stdin writer for List(patch_matching_similarity) java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.krput.writeString_slow(Output.java:420) at com.esotericsoftware.kryo.io.Output.writeString(Output.java:326) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1047) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1056) at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:93) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:745) at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:625) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:167) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.CartesianRDD$$anonfun$compute$1.apply(CartesianRDD.scala:75) at org.apache.spark.rdd.CartesianRDD$$anonfun$compute$1.apply(CartesianRDD.scala:74) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)yo.io.Output.require(Output.java:135) at com.esotericsoftware.kryo.io.Out I don't understand why since I set the same amount of memory in the two cases. Any ideas will be helpfull. I use spark 1.1.0. Cheers, Jao
Build error when using spark with breeze
Hi all, I'm using some functions from Breeze in a spark job but I get the following build error : *Error:scalac: bad symbolic reference. A signature in RandBasis.class refers to term math3* *in package org.apache.commons which is not available.* *It may be completely missing from the current classpath, or the version on* *the classpath might be incompatible with the version used when compiling RandBasis.class.* In my case, I just declare a new Gaussian distribution *val g = new Gaussian(0d,1d)* I'm using spark 1.1 Any ideas to fix this ? Best regards, Jao
Re: Build error when using spark with breeze
Thank Ted. Can you tell me how to adjust the scope ? On Fri, Sep 26, 2014 at 5:47 PM, Ted Yu yuzhih...@gmail.com wrote: spark-core's dependency on commons-math3 is @ test scope (core/pom.xml): dependency groupIdorg.apache.commons/groupId artifactIdcommons-math3/artifactId version3.3/version scopetest/scope /dependency Adjusting the scope should solve the problem below. On Fri, Sep 26, 2014 at 8:42 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm using some functions from Breeze in a spark job but I get the following build error : *Error:scalac: bad symbolic reference. A signature in RandBasis.class refers to term math3* *in package org.apache.commons which is not available.* *It may be completely missing from the current classpath, or the version on* *the classpath might be incompatible with the version used when compiling RandBasis.class.* In my case, I just declare a new Gaussian distribution *val g = new Gaussian(0d,1d)* I'm using spark 1.1 Any ideas to fix this ? Best regards, Jao
Re: Build error when using spark with breeze
I solve the problem by including the commons-math3 package in my sbt dependencies as Sean suggested. Thanks. On Fri, Sep 26, 2014 at 6:05 PM, Ted Yu yuzhih...@gmail.com wrote: You can use scope of runtime. See http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Scope Cheers On Fri, Sep 26, 2014 at 8:57 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Thank Ted. Can you tell me how to adjust the scope ? On Fri, Sep 26, 2014 at 5:47 PM, Ted Yu yuzhih...@gmail.com wrote: spark-core's dependency on commons-math3 is @ test scope (core/pom.xml): dependency groupIdorg.apache.commons/groupId artifactIdcommons-math3/artifactId version3.3/version scopetest/scope /dependency Adjusting the scope should solve the problem below. On Fri, Sep 26, 2014 at 8:42 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm using some functions from Breeze in a spark job but I get the following build error : *Error:scalac: bad symbolic reference. A signature in RandBasis.class refers to term math3* *in package org.apache.commons which is not available.* *It may be completely missing from the current classpath, or the version on* *the classpath might be incompatible with the version used when compiling RandBasis.class.* In my case, I just declare a new Gaussian distribution *val g = new Gaussian(0d,1d)* I'm using spark 1.1 Any ideas to fix this ? Best regards, Jao
Better way to process large image data set ?
Hi all, I'm trying to process a large image data set and need some way to optimize my implementation since it's very slow from now. In my current implementation I store my images in an object file with the following fields case class Image(groupId: String, imageId: String, buffer: String) Images belong to groups and have an id, the buffer is the image file (jpg, png) encode in base 64 string. Before running an image processing algorithm on the image buffer, I have a lot of jobs that filter, group, join images in my data set based on groupId or imageId and theses steps are relatively slow. I suspect that spark moves around my image buffer even if it's not necessary for these specific jobs and then there's a lot of communication times waste. Is there a better way to optimize my implementation ? Regards, Jaonary
Why I get java.lang.OutOfMemoryError: Java heap space with join ?
Dear all, I'm facing the following problem and I can't figure how to solve it. I need to join 2 rdd in order to find their intersections. The first RDD represent an image encoded in base64 string associated with image id. The second RDD represent a set of geometric primitives (rectangle) associated with image id. My goal is to draw these primitives on the corresponding image. So my first attempt is to join images and primitives by image ids and then do the drawing. But, when I do *primitives.join(images) * I got the following error : *java.lang.OutOfMemoryError: Java heap space* * at java.util.Arrays.copyOf(Arrays.java:2367)* * at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)* * at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)* * at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:535)* * at java.lang.StringBuilder.append(StringBuilder.java:204)* * at java.io.ObjectInputStream$BlockDataInputStream.readUTFSpan(ObjectInputStream.java:3143)* * at java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3051)* * at java.io.ObjectInputStream$BlockDataInputStream.readLongUTF(ObjectInputStream.java:3034)* * at java.io.ObjectInputStream.readString(ObjectInputStream.java:1642)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1341)* * at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* * at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* * at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* * at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)* * at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)* * at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)* * at org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031)* * at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)* * at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)* * at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)* * at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)* * at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)* * at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)* * at scala.collection.Iterator$class.foreach(Iterator.scala:727)* * at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)* * at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)* I notice that sometime if I change the partition of the images RDD with coalesce I can get it working. What I'm doing wrong ? Cheers, Jaonary
Re: Spark SQL : how to find element where a field is in a given set
ok, but what if I have a long list do I need to hard code like this every element of my list of is there a function that translate a list into a tuple ? On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust mich...@databricks.com wrote: You don't need the Seq, as in is a variadic function. personTable.where('name in (foo, bar)) On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, What is the expression that I should use with spark sql DSL if I need to retreive data with a field in a given set. For example : I have the following schema case class Person(name: String, age: Int) And I need to do something like : personTable.where('name in Seq(foo, bar)) ? Cheers. Jaonary
Re: Spark SQL : how to find element where a field is in a given set
Still not working for me. I got a compilation error : *value in is not a member of Symbol.* Any ideas ? On Fri, Aug 29, 2014 at 9:46 AM, Michael Armbrust mich...@databricks.com wrote: To pass a list to a variadic function you can use the type ascription :_* For example: val longList = Seq[Expression](a, b, ...) table(src).where('key in (longList: _*)) Also, note that I had to explicitly specify Expression as the type parameter of Seq to ensure that the compiler converts a and b into Spark SQL expressions. On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: ok, but what if I have a long list do I need to hard code like this every element of my list of is there a function that translate a list into a tuple ? On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust mich...@databricks.com wrote: You don't need the Seq, as in is a variadic function. personTable.where('name in (foo, bar)) On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, What is the expression that I should use with spark sql DSL if I need to retreive data with a field in a given set. For example : I have the following schema case class Person(name: String, age: Int) And I need to do something like : personTable.where('name in Seq(foo, bar)) ? Cheers. Jaonary
Re: Spark SQL : how to find element where a field is in a given set
1.0.2 On Friday, August 29, 2014, Michael Armbrust mich...@databricks.com wrote: What version are you using? On Fri, Aug 29, 2014 at 2:22 AM, Jaonary Rabarisoa jaon...@gmail.com javascript:_e(%7B%7D,'cvml','jaon...@gmail.com'); wrote: Still not working for me. I got a compilation error : *value in is not a member of Symbol.* Any ideas ? On Fri, Aug 29, 2014 at 9:46 AM, Michael Armbrust mich...@databricks.com javascript:_e(%7B%7D,'cvml','mich...@databricks.com'); wrote: To pass a list to a variadic function you can use the type ascription :_* For example: val longList = Seq[Expression](a, b, ...) table(src).where('key in (longList: _*)) Also, note that I had to explicitly specify Expression as the type parameter of Seq to ensure that the compiler converts a and b into Spark SQL expressions. On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa jaon...@gmail.com javascript:_e(%7B%7D,'cvml','jaon...@gmail.com'); wrote: ok, but what if I have a long list do I need to hard code like this every element of my list of is there a function that translate a list into a tuple ? On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust mich...@databricks.com javascript:_e(%7B%7D,'cvml','mich...@databricks.com'); wrote: You don't need the Seq, as in is a variadic function. personTable.where('name in (foo, bar)) On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com javascript:_e(%7B%7D,'cvml','jaon...@gmail.com'); wrote: Hi all, What is the expression that I should use with spark sql DSL if I need to retreive data with a field in a given set. For example : I have the following schema case class Person(name: String, age: Int) And I need to do something like : personTable.where('name in Seq(foo, bar)) ? Cheers. Jaonary
Spark SQL : how to find element where a field is in a given set
Hi all, What is the expression that I should use with spark sql DSL if I need to retreive data with a field in a given set. For example : I have the following schema case class Person(name: String, age: Int) And I need to do something like : personTable.where('name in Seq(foo, bar)) ? Cheers. Jaonary
External dependencies management with spark
Dear all, I'm looking for an efficient way to manage external dependencies. I know that one can add .jar or .py dependencies easily but how can I handle other type of dependencies. Specifically, I have some data processing algorithm implemented with other languages (ruby, octave, matlab, c++) and I need to put all of these libraries component (set of .rb, .m, .so files) on my workers in order to able to call them as external process. What is the best way to achieve this ? Cheers, Jaonary
Re: spark and matlab
Thank you Matei. I found a solution using pipe and matlab engine (an executable that can call matlab behind the scene and uses stdin and stdout to communicate). I just need to fix two other issues : - how can I handle my dependencies ? My matlab script need other matlab files that need to be present on each workers' matlab path. So I need a way to push them to each worker and tell matlab where to find them with addpath. I know how to call addpath but I don't know what should be the path. - is the pipe() operator works on a partition level in order to run the external process once for each data in a partition. Initializing my external process cost a lot so it is not good to call it several times. On Mon, Aug 25, 2014 at 9:03 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Have you tried the pipe() operator? It should work if you can launch your script from the command line. Just watch out for any environment variables needed (you can pass them to pipe() as an optional argument if there are some). On August 25, 2014 at 12:41:29 AM, Jaonary Rabarisoa (jaon...@gmail.com) wrote: Hi all, Is there someone that tried to pipe RDD into matlab script ? I'm trying to do something similiar if one of you could point some hints. Best regards, Jao
spark and matlab
Hi all, Is there someone that tried to pipe RDD into matlab script ? I'm trying to do something similiar if one of you could point some hints. Best regards, Jao
RDD pipe partitionwise
Dear all, Is there any example of mapPartitions that fork external process or how to make RDD.pipe working on every data of a partition ? Cheers, Jaonary
Re: Ambiguous references to id : what does it mean ?
My query is just a simple query that use the spark sql dsl : tagCollection.join(selectedVideos).where('videoId === 'id) On Tue, Jul 15, 2014 at 6:03 PM, Yin Huai huaiyin@gmail.com wrote: Hi Jao, Seems the SQL analyzer cannot resolve the references in the Join condition. What is your query? Did you use the Hive Parser (your query was submitted through hql(...)) or the basic SQL Parser (your query was submitted through sql(...)). Thanks, Yin On Tue, Jul 15, 2014 at 8:52 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, When running a join operation with Spark SQL I got the following error : Exception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Ambiguous references to id: (id#303,List()),(id#0,List()), tree: Filter ('videoId = 'id) Join Inner, None ParquetRelation data/tags.parquet Filter (name#1 = P1/cam1) ParquetRelation data/videos.parquet What does it mean ? Cheers, jao
Need advice to create an objectfile of set of images from Spark
Hi all, I need to run a spark job that need a set of images as input. I need something that load these images as RDD but I just don't know how to do that. Do some of you have any idea ? Cheers, Jao
Re: Need advice to create an objectfile of set of images from Spark
The idea is to run a job that use images as input so that each work will process a subset of the images On Wed, Jul 9, 2014 at 2:30 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: RDD can only keep objects. How do you plan to encode these images so that they can be loaded. Keeping the whole image as a single object in 1 rdd would perhaps not be super optimized. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jul 9, 2014 at 12:17 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I need to run a spark job that need a set of images as input. I need something that load these images as RDD but I just don't know how to do that. Do some of you have any idea ? Cheers, Jao
Configure and run external process with RDD.pipe
Hi all, I need to run a complex external process with a lots of dependencies from spark. The pipe and addFile function seem to be my friends but there are just some issues that I need to solve. Precisely, the process I want to run are C++ executable that may depend on some libraries and additional file parameters. I bundle every things in one tar file so I may have the following structure : myalgo: -- run.exe -- libdepend_run.so -- parameter_file For example my algo may be a support vector machine with the trained model file. Now I need a way to deploy my bundled algo on every node and pipe the executable on my RDD. My question is : is it possible to deploy my tar files and extract them on every worker so that I can invoke my executable ? Any ideas will be helpfull, Cheers,
Re: Using Spark as web app backend
Hi all, Thank you for the reply. Is there any example of spark running in client mode with spray ? I think, I will choose this approach. On Tue, Jun 24, 2014 at 4:55 PM, Koert Kuipers ko...@tresata.com wrote: run your spark app in client mode together with a spray rest service, that the front end can talk to On Tue, Jun 24, 2014 at 3:12 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, So far, I run my spark jobs with spark-shell or spark-submit command. I'd like to go further and I wonder how to use spark as a backend of a web application. Specificaly, I want a frontend application ( build with nodejs ) to communicate with spark on the backend, so that every query from the frontend is rooted to spark. And the result from Spark are sent back to the frontend. Does some of you already experiment this kind of architecture ? Cheers, Jaonary
Need help to make spark sql works in stand alone application
Hi all, I'm trying to use spark sql to store data in parquet file. I create the file and insert data into it with the following code : *val conf = new SparkConf().setAppName(MCT).setMaster(local[2]) val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc) import sqlContext._val personParquet = createParquetFile[Person](people_1.pqt) personParquet.registerAsTable(people)val data = sc.parallelize(Seq(Person(Toto, 10), Person(foo, 101))) data.insertInto(people)personParquet.collect foreach(println) data.insertInto(people)val personParquet2 = parquetFile(people_1.pqt)personParquet2.collect foreach(println)* It works as I expect when I run it in spark-shell. But with a stand alone application, I get a build error : *MCT.scala:18: not found: value createParquetFile* If I skip this creation set and save the rdd as parquet file directly it works. But then, when I insert new data nothing happen. What I'm doing wrong ? Best regards, Jaonary
wholeTextFiles like for binary files ?
Is there an equivalent of wholeTextFiles for binary files for example a set of images ? Cheers, Jaonary
Using Spark as web app backend
Hi all, So far, I run my spark jobs with spark-shell or spark-submit command. I'd like to go further and I wonder how to use spark as a backend of a web application. Specificaly, I want a frontend application ( build with nodejs ) to communicate with spark on the backend, so that every query from the frontend is rooted to spark. And the result from Spark are sent back to the frontend. Does some of you already experiment this kind of architecture ? Cheers, Jaonary
Hybrid GPU CPU computation
Hi all, I'm just wondering if hybrid GPU/CPU computation is something that is feasible with spark ? And what should be the best way to do it. Cheers, Jaonary
Re: Hybrid GPU CPU computation
In fact the idea is to run some part of the code on GPU as Patrick described and extend the RDD structure so that it can also be distributed on GPU's. The following article http://www.wired.com/2013/06/andrew_ng/ describes a hybrid GPU/GPU implementation (with MPI) that outperforms a 16, 000 cores cluster. On Fri, Apr 11, 2014 at 3:53 PM, Patrick Grinaway pgrina...@gmail.comwrote: I've actually done it using PySpark and python libraries which call cuda code, though I've never done it from scala directly. The only major challenge I've hit is assigning tasks to gpus on multiple gpu machines. Sent from my iPhone On Apr 11, 2014, at 8:38 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm just wondering if hybrid GPU/CPU computation is something that is feasible with spark ? And what should be the best way to do it. Cheers, Jaonary
Re: Strange behavior of RDD.cartesian
You can find here a gist that illustrates this issue https://gist.github.com/jrabary/9953562 I got this with spark from master branch. On Sat, Mar 29, 2014 at 7:12 PM, Andrew Ash and...@andrewash.com wrote: Is this spark 0.9.0? Try setting spark.shuffle.spill=false There was a hash collision bug that's fixed in 0.9.1 that might cause you to have too few results in that join. Sent from my mobile phone On Mar 28, 2014 8:04 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Weird, how exactly are you pulling out the sample? Do you have a small program that reproduces this? Matei On Mar 28, 2014, at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: I forgot to mention that I don't really use all of my data. Instead I use a sample extracted with randomSample. On Fri, Mar 28, 2014 at 10:58 AM, Jaonary Rabarisoa jaon...@gmail.comwrote: Hi all, I notice that RDD.cartesian has a strange behavior with cached and uncached data. More precisely, I have a set of data that I load with objectFile *val data: RDD[(Int,String,Array[Double])] = sc.objectFile(data)* Then I split it in two set depending on some criteria *val part1 = data.filter(_._2 matches view1)* *val part2 = data.filter(_._2 matches view2)* Finally, I compute the cartesian product of part1 and part2 *val pair = part1.cartesian(part2)* If every thing goes well I should have *pair.count == part1.count * part2.count* But this is not the case if I don't cache part1 and part2. What I was missing ? Does caching data mandatory in Spark ? Cheers, Jaonary
Use combineByKey and StatCount
Hi all; Can someone give me some tips to compute mean of RDD by key , maybe with combineByKey and StatCount. Cheers, Jaonary
Re: Strange behavior of RDD.cartesian
I forgot to mention that I don't really use all of my data. Instead I use a sample extracted with randomSample. On Fri, Mar 28, 2014 at 10:58 AM, Jaonary Rabarisoa jaon...@gmail.comwrote: Hi all, I notice that RDD.cartesian has a strange behavior with cached and uncached data. More precisely, I have a set of data that I load with objectFile *val data: RDD[(Int,String,Array[Double])] = sc.objectFile(data)* Then I split it in two set depending on some criteria *val part1 = data.filter(_._2 matches view1)* *val part2 = data.filter(_._2 matches view2)* Finally, I compute the cartesian product of part1 and part2 *val pair = part1.cartesian(part2)* If every thing goes well I should have *pair.count == part1.count * part2.count* But this is not the case if I don't cache part1 and part2. What I was missing ? Does caching data mandatory in Spark ? Cheers, Jaonary
Re: java.lang.ClassNotFoundException
it seems to be an old problem : http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I Does anyone got the solution ? On Wed, Mar 26, 2014 at 5:50 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote: I might be way off here but are you looking at the logs on the worker machines? I am running an older version (0.8) and when I look at the error log for the executor process I see the exact location where the executor process tries to load the jar from...with a line like this: 14/03/26 13:57:11 INFO executor.Executor: Adding file:/dirs/dirs/spark/work/app-20140326135710-0029/0/./spark-test.jar to class loader You said The jar file is present in each node, do you see any information on the executor indicating that it's trying to load the jar or where it's loading it from? I can't tell for sure by looking at your logs but they seem to be logs from the master and driver, not from the executor itself? On Wed, Mar 26, 2014 at 11:46 AM, Ognen Duzlevski og...@plainvanillagames.com wrote: Have you looked through the logs fully? I have seen this (in my limited experience) pop up as a result of previous exceptions/errors, also as a result of being unable to serialize objects etc. Ognen On 3/26/14, 10:39 AM, Jaonary Rabarisoa wrote: I notice that I get this error when I'm trying to load an objectFile with val viperReloaded = context.objectFile[ReIdDataSetEntry](data) On Wed, Mar 26, 2014 at 3:58 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Here the output that I get : [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 4 times (most recent failure: Exception failure in TID 6 on host 172.166.86.36: java.lang.ClassNotFoundException: value.models.ReIdDataSetEntry) org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 4 times (most recent failure: Exception failure in TID 6 on host 172.166.86.36: java.lang.ClassNotFoundException: value.models.ReIdDataSetEntry) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1011) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1009) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1009) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:596) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:596) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:596) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:146) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Spark says that the jar is added : 14/03/26 15:49:18 INFO SparkContext: Added JAR target/scala-2.10/value-spark_2.10-1.0.jar On Wed, Mar 26, 2014 at 3:34 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Have you looked at the individual nodes logs? Can you post a bit more of the exception's output? On 3/26/14, 8:42 AM, Jaonary Rabarisoa wrote: Hi all, I got java.lang.ClassNotFoundException even with addJar called. The jar file is present in each node. I use the version of spark from github master. Any ideas ? Jaonary
mapPartitions use case
Dear all, Sorry for asking such a basic question, but someone can explain when one should use mapPartiontions instead of map. Thanks Jaonary
Yet another question on saving RDD into files
Dear all, As a Spark newbie, I need some help to understand how RDD save to file behaves. After reading the post on saving single files efficiently http://apache-spark-user-list.1001560.n3.nabble.com/How-to-save-as-a-single-file-efficiently-td3014.html I understand that each partition of the RDD is saved into a separate file, isn't it ? And in order to get one single file, one should call coalesce(1,shuffle=true), right ? The other use case that I have is : append a RDD into existing file. Is it possible with spark ? Precisely, I have a map transformation that results vary over time, like a big time series : I need to store the result for further analysis but if I store the RDD in a different file each time I run the computation I may end with many little files. A pseudo code of my process is as follow : every tamestamp do RDD[Array[Double]].map - RDD[(timestamp,Double)].save to the same file What should be the best solution to that ? Best
Does RDD.saveAsObjectFile appends or create a new file ?
Dear all, I need to run a series of transformations that map a RDD into another RDD. The computation changes over times and so does the resulting RDD. Each results is then saved to the disk in order to do further analysis (for example variation of the result over time). The question is, if I save the RDDs in the same file, is it appended to the existing file or not ? And If I write into different files each time I want to save the result I may end with many little files and I read everywhere that hadoop doesn't like many little files. Does spark ok with that ? Cheers, Jaonary
N-Fold validation and RDD partitions
Hi I need to partition my data represented as RDD into n folds and run metrics computation in each fold and finally compute the means of my metrics overall the folds. Does spark can do the data partition out of the box or do I need to implement it myself. I know that RDD has a partitions method and mapPartitions but I really don't understand the purpose and the meaning of partition here. Cheers, Jaonary
Re: Hadoop streaming like feature for Spark
Thank you Ewen. RDD.pipe is what I need and it works like a charm. On the other side RDD.mapPartitions seems to be interesting but I can't figure out how to make it work. Jaonary On Thu, Mar 20, 2014 at 4:54 PM, Ewen Cheslack-Postava m...@ewencp.orgwrote: Take a look at RDD.pipe(). You could also accomplish the same thing using RDD.mapPartitions, which you pass a function that processes the iterator for each partition rather than processing each element individually. This lets you only start up as many processes as there are partitions, pipe the contents of each iterator to them, then collect the output. This might be useful if, e.g., your external process doesn't use line-oriented input/output. -Ewen Jaonary Rabarisoa jaon...@gmail.com March 20, 2014 at 1:04 AM Dear all, Dear all, Does Spark has a kind of Hadoop streaming feature to run external process to manipulate data from RDD sent through stdin and stdout ? Best, Jaonary inline: compose-unknown-contact.jpg
How to distribute external executable (script) with Spark ?
Hi all, I'm trying to build an evaluation platform based on Spark. The idea is to run a blackbox executable (build with c/c++ or some scripting language). This blackbox takes a set of data as input and outpout some metrics. Since I have a huge amount of data, I need to distribute the computation and use tools like mapreduce. The question is, how do I send these blacboxes executable to each node automatically so they can be called. I need something similar to addJar but for any kind of files. Cheers,
Feed KMeans algorithm with a row major matrix
Dear All, I'm trying to cluster data from native library code with Spark Kmeans||. In my native library the data are represented as a matrix (row = number of data and col = dimension). For efficiency reason, they are copied into a one dimensional scala Array row major wise so after the computation I have a RDD[Array[Double]] but the dimension of each array represents a set of data instead of the data itself. I need to transfrom these array into Array[Array[Double]] before running the KMeans|| algorithm. How to do this efficiently ? Best regards,
Reading sequencefile
Hi all, I'm trying to read a sequenceFile that represent a set of jpeg image generated using this tool : http://stuartsierra.com/2008/04/24/a-million-little-files . According to the documentation : Each key is the name of a file (a Hadoop “Text”), the value is the binary contents of the file (a BytesWritable) How do I load the generated file inside spark ? Cheers, Jaonary