In my transformSchema I do specify that the output column type is a VectorUDT :
*override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { val map = this.paramMap ++ paramMap checkInputColumn(schema, map(inputCol), ArrayType(FloatType, false)) addOutputColumn(schema, map(outputCol), new VectorUDT)}* The output of printSchema is as follow : *|-- cnnFeature: vecto (nullable = false)* On Tue, Mar 31, 2015 at 9:55 AM, Shivaram Venkataraman < shiva...@eecs.berkeley.edu> wrote: > My guess is that the `createDataFrame` call is failing here. Can you > check if the schema being passed to it includes the column name and type > for the newly being zipped `features` ? > > Joseph probably knows this better, but AFAIK the DenseVector here will > need to be marked as a VectorUDT while creating a DataFrame column > > Thanks > Shivaram > > On Tue, Mar 31, 2015 at 12:50 AM, Jaonary Rabarisoa <jaon...@gmail.com> > wrote: > >> Following your suggestion, I end up with the following implementation : >> >> >> >> >> >> >> >> *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = >> { val schema = transformSchema(dataSet.schema, paramMap, logging = true) >> val map = this.paramMap ++ paramMap* >> >> >> >> >> >> >> >> >> >> >> >> >> >> *val features = dataSet.select(map(inputCol)).mapPartitions { rows => >> Caffe.set_mode(Caffe.CPU) val net = >> CaffeUtils.floatTestNetwork(SparkFiles.get(topology), >> SparkFiles.get(weight)) val inputBlobs: FloatBlobVector = >> net.input_blobs() val N: Int = 1 val K: Int = >> inputBlobs.get(0).channels() val H: Int = inputBlobs.get(0).height() >> val W: Int = inputBlobs.get(0).width() inputBlobs.get(0).Reshape(N, K, H, >> W) val dataBlob = new FloatPointer(N*K*W*H)* >> val inputCPUData = inputBlobs.get(0).mutable_cpu_data() >> >> val feat = rows.map { case Row(a: Iterable[Float])=> >> dataBlob.put(a.toArray, 0, a.size) >> caffe_copy_float(N*K*W*H, dataBlob, inputCPUData) >> val resultBlobs: FloatBlobVector = net.ForwardPrefilled() >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> * val resultDim = resultBlobs.get(0).channels() logInfo(s"Output >> dimension $resultDim") val resultBlobData = >> resultBlobs.get(0).cpu_data() val output = new Array[Float](resultDim) >> resultBlobData.get(output) Vectors.dense(output.map(_.toDouble)) >> } //net.deallocate() feat } val newRowData = >> dataSet.rdd.zip(features).map { case (old, feat)=> val oldSeq = old.toSeq >> Row.fromSeq(oldSeq :+ feat) } >> dataSet.sqlContext.createDataFrame(newRowData, schema)}* >> >> >> The idea is to mapPartitions of the underlying RDD of the DataFrame and >> create a new DataFrame by zipping the results. It seems to work but when I >> try to save the RDD I got the following error : >> >> org.apache.spark.mllib.linalg.DenseVector cannot be cast to >> org.apache.spark.sql.Row >> >> >> On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman < >> shiva...@eecs.berkeley.edu> wrote: >> >>> One workaround could be to convert a DataFrame into a RDD inside the >>> transform function and then use mapPartitions/broadcast to work with the >>> JNI calls and then convert back to RDD. >>> >>> Thanks >>> Shivaram >>> >>> On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa <jaon...@gmail.com> >>> wrote: >>> >>>> Dear all, >>>> >>>> I'm still struggling to make a pre-trained caffe model transformer for >>>> dataframe works. The main problem is that creating a caffe model inside the >>>> UDF is very slow and consumes memories. >>>> >>>> Some of you suggest to broadcast the model. The problem with >>>> broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset >>>> and it is not serializable. >>>> >>>> Another possible approach is to use a UDF that can handle a whole >>>> partitions instead of just a row in order to minimize the caffe model >>>> instantiation. >>>> >>>> Is there any ideas to solve one of these two issues ? >>>> >>>> >>>> >>>> Best, >>>> >>>> Jao >>>> >>>> On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley <jos...@databricks.com> >>>> wrote: >>>> >>>>> I see. I think your best bet is to create the cnnModel on the master >>>>> and then serialize it to send to the workers. If it's big (1M or so), >>>>> then >>>>> you can broadcast it and use the broadcast variable in the UDF. There is >>>>> not a great way to do something equivalent to mapPartitions with UDFs >>>>> right >>>>> now. >>>>> >>>>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa <jaon...@gmail.com> >>>>> wrote: >>>>> >>>>>> Here is my current implementation with current master version of >>>>>> spark >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> *class DeepCNNFeature extends Transformer with HasInputCol with >>>>>> HasOutputCol ... { override def transformSchema(...) { ... }* >>>>>> * override def transform(dataSet: DataFrame, paramMap: ParamMap): >>>>>> DataFrame = {* >>>>>> >>>>>> * transformSchema(dataSet.schema, paramMap, logging = >>>>>> true)* >>>>>> >>>>>> >>>>>> >>>>>> * val map = this.paramMap ++ paramMap >>>>>> val deepCNNFeature = udf((v: Vector)=> {* >>>>>> >>>>>> * val cnnModel = new CaffeModel * >>>>>> >>>>>> * cnnModel.transform(v)* >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> * } : Vector ) >>>>>> dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol))))* >>>>>> >>>>>> >>>>>> * }* >>>>>> *}* >>>>>> >>>>>> where CaffeModel is a java api to Caffe C++ model. >>>>>> >>>>>> The problem here is that for every row it will create a new instance >>>>>> of CaffeModel which is inefficient since creating a new model >>>>>> means loading a large model file. And it will transform only a single >>>>>> row at a time. Or a Caffe network can process a batch of rows >>>>>> efficiently. >>>>>> In other words, is it possible to create an UDF that can operatats on a >>>>>> partition in order to minimize the creation of a CaffeModel and >>>>>> to take advantage of the Caffe network batch processing ? >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <jos...@databricks.com >>>>>> > wrote: >>>>>> >>>>>>> I see, thanks for clarifying! >>>>>>> >>>>>>> I'd recommend following existing implementations in spark.ml >>>>>>> transformers. You'll need to define a UDF which operates on a single >>>>>>> Row >>>>>>> to compute the value for the new column. You can then use the DataFrame >>>>>>> DSL to create the new column; the DSL provides a nice syntax for what >>>>>>> would >>>>>>> otherwise be a SQL statement like "select ... from ...". I'm >>>>>>> recommending >>>>>>> looking at the existing implementation (rather than stating it here) >>>>>>> because it changes between Spark 1.2 and 1.3. In 1.3, the DSL is much >>>>>>> improved and makes it easier to create a new column. >>>>>>> >>>>>>> Joseph >>>>>>> >>>>>>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <jaon...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> class DeepCNNFeature extends Transformer ... { >>>>>>>> >>>>>>>> override def transform(data: DataFrame, paramMap: ParamMap): >>>>>>>> DataFrame = { >>>>>>>> >>>>>>>> >>>>>>>> // How can I do a map partition on the underlying >>>>>>>> RDD and then add the column ? >>>>>>>> >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa < >>>>>>>> jaon...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Joseph, >>>>>>>>> >>>>>>>>> Thank your for the tips. I understand what should I do when my >>>>>>>>> data are represented as a RDD. The thing that I can't figure out is >>>>>>>>> how to >>>>>>>>> do the same thing when the data is view as a DataFrame and I need to >>>>>>>>> add >>>>>>>>> the result of my pretrained model as a new column in the DataFrame. >>>>>>>>> Preciselly, I want to implement the following transformer : >>>>>>>>> >>>>>>>>> class DeepCNNFeature extends Transformer ... { >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley < >>>>>>>>> jos...@databricks.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Jao, >>>>>>>>>> >>>>>>>>>> You can use external tools and libraries if they can be called >>>>>>>>>> from your Spark program or script (with appropriate conversion of >>>>>>>>>> data >>>>>>>>>> types, etc.). The best way to apply a pre-trained model to a >>>>>>>>>> dataset would >>>>>>>>>> be to call the model from within a closure, e.g.: >>>>>>>>>> >>>>>>>>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) } >>>>>>>>>> >>>>>>>>>> If your data is distributed in an RDD (myRDD), then the above >>>>>>>>>> call will distribute the computation of prediction using the >>>>>>>>>> pre-trained >>>>>>>>>> model. It will require that all of your Spark workers be able to >>>>>>>>>> run the >>>>>>>>>> preTrainedModel; that may mean installing Caffe and dependencies on >>>>>>>>>> all >>>>>>>>>> nodes in the compute cluster. >>>>>>>>>> >>>>>>>>>> For the second question, I would modify the above call as follows: >>>>>>>>>> >>>>>>>>>> myRDD.mapPartitions { myDataOnPartition => >>>>>>>>>> val myModel = // instantiate neural network on this partition >>>>>>>>>> myDataOnPartition.map { myDatum => myModel.predict(myDatum) } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> I hope this helps! >>>>>>>>>> Joseph >>>>>>>>>> >>>>>>>>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa < >>>>>>>>>> jaon...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Dear all, >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> We mainly do large scale computer vision task (image >>>>>>>>>>> classification, retrieval, ...). The pipeline is really great stuff >>>>>>>>>>> for >>>>>>>>>>> that. We're trying to reproduce the tutorial given on that topic >>>>>>>>>>> during the >>>>>>>>>>> latest spark summit ( >>>>>>>>>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html >>>>>>>>>>> ) >>>>>>>>>>> using the master version of spark pipeline and dataframe. The >>>>>>>>>>> tutorial >>>>>>>>>>> shows different examples of feature extraction stages before running >>>>>>>>>>> machine learning algorithms. Even the tutorial is straightforward to >>>>>>>>>>> reproduce with this new API, we still have some questions : >>>>>>>>>>> >>>>>>>>>>> - Can one use external tools (e.g via pipe) as a pipeline >>>>>>>>>>> stage ? An example of use case is to extract feature learned with >>>>>>>>>>> convolutional neural network. In our case, this corresponds to a >>>>>>>>>>> pre-trained neural network with Caffe library ( >>>>>>>>>>> http://caffe.berkeleyvision.org/) . >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> - The second question is about the performance of the >>>>>>>>>>> pipeline. Library such as Caffe processes the data in batch and >>>>>>>>>>> instancing >>>>>>>>>>> one Caffe network can be time consuming when this network is >>>>>>>>>>> very deep. So, >>>>>>>>>>> we can gain performance if we minimize the number of Caffe >>>>>>>>>>> network creation >>>>>>>>>>> and give data in batch to the network. In the pipeline, this >>>>>>>>>>> corresponds to >>>>>>>>>>> run transformers that work on a partition basis and give the >>>>>>>>>>> whole >>>>>>>>>>> partition to a single caffe network. How can we create such a >>>>>>>>>>> transformer ? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> >>>>>>>>>>> Jao >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >