My guess is that the `createDataFrame` call is failing here. Can you check if the schema being passed to it includes the column name and type for the newly being zipped `features` ?
Joseph probably knows this better, but AFAIK the DenseVector here will need to be marked as a VectorUDT while creating a DataFrame column Thanks Shivaram On Tue, Mar 31, 2015 at 12:50 AM, Jaonary Rabarisoa <jaon...@gmail.com> wrote: > Following your suggestion, I end up with the following implementation : > > > > > > > > *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = > { val schema = transformSchema(dataSet.schema, paramMap, logging = true) > val map = this.paramMap ++ paramMap* > > > > > > > > > > > > > > *val features = dataSet.select(map(inputCol)).mapPartitions { rows => > Caffe.set_mode(Caffe.CPU) val net = > CaffeUtils.floatTestNetwork(SparkFiles.get(topology), SparkFiles.get(weight)) > val inputBlobs: FloatBlobVector = net.input_blobs() val N: Int = 1 > val K: Int = inputBlobs.get(0).channels() val H: Int = > inputBlobs.get(0).height() val W: Int = inputBlobs.get(0).width() > inputBlobs.get(0).Reshape(N, K, H, W) val dataBlob = new > FloatPointer(N*K*W*H)* > val inputCPUData = inputBlobs.get(0).mutable_cpu_data() > > val feat = rows.map { case Row(a: Iterable[Float])=> > dataBlob.put(a.toArray, 0, a.size) > caffe_copy_float(N*K*W*H, dataBlob, inputCPUData) > val resultBlobs: FloatBlobVector = net.ForwardPrefilled() > > > > > > > > > > > > > > > > > > > > > > > > * val resultDim = resultBlobs.get(0).channels() logInfo(s"Output > dimension $resultDim") val resultBlobData = > resultBlobs.get(0).cpu_data() val output = new Array[Float](resultDim) > resultBlobData.get(output) Vectors.dense(output.map(_.toDouble)) } > //net.deallocate() feat } val newRowData = > dataSet.rdd.zip(features).map { case (old, feat)=> val oldSeq = old.toSeq > Row.fromSeq(oldSeq :+ feat) } > dataSet.sqlContext.createDataFrame(newRowData, schema)}* > > > The idea is to mapPartitions of the underlying RDD of the DataFrame and > create a new DataFrame by zipping the results. It seems to work but when I > try to save the RDD I got the following error : > > org.apache.spark.mllib.linalg.DenseVector cannot be cast to > org.apache.spark.sql.Row > > > On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman < > shiva...@eecs.berkeley.edu> wrote: > >> One workaround could be to convert a DataFrame into a RDD inside the >> transform function and then use mapPartitions/broadcast to work with the >> JNI calls and then convert back to RDD. >> >> Thanks >> Shivaram >> >> On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa <jaon...@gmail.com> >> wrote: >> >>> Dear all, >>> >>> I'm still struggling to make a pre-trained caffe model transformer for >>> dataframe works. The main problem is that creating a caffe model inside the >>> UDF is very slow and consumes memories. >>> >>> Some of you suggest to broadcast the model. The problem with >>> broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset >>> and it is not serializable. >>> >>> Another possible approach is to use a UDF that can handle a whole >>> partitions instead of just a row in order to minimize the caffe model >>> instantiation. >>> >>> Is there any ideas to solve one of these two issues ? >>> >>> >>> >>> Best, >>> >>> Jao >>> >>> On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley <jos...@databricks.com> >>> wrote: >>> >>>> I see. I think your best bet is to create the cnnModel on the master >>>> and then serialize it to send to the workers. If it's big (1M or so), then >>>> you can broadcast it and use the broadcast variable in the UDF. There is >>>> not a great way to do something equivalent to mapPartitions with UDFs right >>>> now. >>>> >>>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa <jaon...@gmail.com> >>>> wrote: >>>> >>>>> Here is my current implementation with current master version of spark >>>>> >>>>> >>>>> >>>>> >>>>> *class DeepCNNFeature extends Transformer with HasInputCol with >>>>> HasOutputCol ... { override def transformSchema(...) { ... }* >>>>> * override def transform(dataSet: DataFrame, paramMap: ParamMap): >>>>> DataFrame = {* >>>>> >>>>> * transformSchema(dataSet.schema, paramMap, logging = >>>>> true)* >>>>> >>>>> >>>>> >>>>> * val map = this.paramMap ++ paramMap >>>>> val deepCNNFeature = udf((v: Vector)=> {* >>>>> >>>>> * val cnnModel = new CaffeModel * >>>>> >>>>> * cnnModel.transform(v)* >>>>> >>>>> >>>>> >>>>> >>>>> * } : Vector ) >>>>> dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol))))* >>>>> >>>>> >>>>> * }* >>>>> *}* >>>>> >>>>> where CaffeModel is a java api to Caffe C++ model. >>>>> >>>>> The problem here is that for every row it will create a new instance >>>>> of CaffeModel which is inefficient since creating a new model >>>>> means loading a large model file. And it will transform only a single >>>>> row at a time. Or a Caffe network can process a batch of rows efficiently. >>>>> In other words, is it possible to create an UDF that can operatats on a >>>>> partition in order to minimize the creation of a CaffeModel and >>>>> to take advantage of the Caffe network batch processing ? >>>>> >>>>> >>>>> >>>>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <jos...@databricks.com> >>>>> wrote: >>>>> >>>>>> I see, thanks for clarifying! >>>>>> >>>>>> I'd recommend following existing implementations in spark.ml >>>>>> transformers. You'll need to define a UDF which operates on a single Row >>>>>> to compute the value for the new column. You can then use the DataFrame >>>>>> DSL to create the new column; the DSL provides a nice syntax for what >>>>>> would >>>>>> otherwise be a SQL statement like "select ... from ...". I'm >>>>>> recommending >>>>>> looking at the existing implementation (rather than stating it here) >>>>>> because it changes between Spark 1.2 and 1.3. In 1.3, the DSL is much >>>>>> improved and makes it easier to create a new column. >>>>>> >>>>>> Joseph >>>>>> >>>>>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <jaon...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> class DeepCNNFeature extends Transformer ... { >>>>>>> >>>>>>> override def transform(data: DataFrame, paramMap: ParamMap): >>>>>>> DataFrame = { >>>>>>> >>>>>>> >>>>>>> // How can I do a map partition on the underlying >>>>>>> RDD and then add the column ? >>>>>>> >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa < >>>>>>> jaon...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Joseph, >>>>>>>> >>>>>>>> Thank your for the tips. I understand what should I do when my data >>>>>>>> are represented as a RDD. The thing that I can't figure out is how to >>>>>>>> do >>>>>>>> the same thing when the data is view as a DataFrame and I need to add >>>>>>>> the >>>>>>>> result of my pretrained model as a new column in the DataFrame. >>>>>>>> Preciselly, >>>>>>>> I want to implement the following transformer : >>>>>>>> >>>>>>>> class DeepCNNFeature extends Transformer ... { >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley < >>>>>>>> jos...@databricks.com> wrote: >>>>>>>> >>>>>>>>> Hi Jao, >>>>>>>>> >>>>>>>>> You can use external tools and libraries if they can be called >>>>>>>>> from your Spark program or script (with appropriate conversion of data >>>>>>>>> types, etc.). The best way to apply a pre-trained model to a dataset >>>>>>>>> would >>>>>>>>> be to call the model from within a closure, e.g.: >>>>>>>>> >>>>>>>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) } >>>>>>>>> >>>>>>>>> If your data is distributed in an RDD (myRDD), then the above call >>>>>>>>> will distribute the computation of prediction using the pre-trained >>>>>>>>> model. >>>>>>>>> It will require that all of your Spark workers be able to run the >>>>>>>>> preTrainedModel; that may mean installing Caffe and dependencies on >>>>>>>>> all >>>>>>>>> nodes in the compute cluster. >>>>>>>>> >>>>>>>>> For the second question, I would modify the above call as follows: >>>>>>>>> >>>>>>>>> myRDD.mapPartitions { myDataOnPartition => >>>>>>>>> val myModel = // instantiate neural network on this partition >>>>>>>>> myDataOnPartition.map { myDatum => myModel.predict(myDatum) } >>>>>>>>> } >>>>>>>>> >>>>>>>>> I hope this helps! >>>>>>>>> Joseph >>>>>>>>> >>>>>>>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa < >>>>>>>>> jaon...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Dear all, >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> We mainly do large scale computer vision task (image >>>>>>>>>> classification, retrieval, ...). The pipeline is really great stuff >>>>>>>>>> for >>>>>>>>>> that. We're trying to reproduce the tutorial given on that topic >>>>>>>>>> during the >>>>>>>>>> latest spark summit ( >>>>>>>>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html >>>>>>>>>> ) >>>>>>>>>> using the master version of spark pipeline and dataframe. The >>>>>>>>>> tutorial >>>>>>>>>> shows different examples of feature extraction stages before running >>>>>>>>>> machine learning algorithms. Even the tutorial is straightforward to >>>>>>>>>> reproduce with this new API, we still have some questions : >>>>>>>>>> >>>>>>>>>> - Can one use external tools (e.g via pipe) as a pipeline >>>>>>>>>> stage ? An example of use case is to extract feature learned with >>>>>>>>>> convolutional neural network. In our case, this corresponds to a >>>>>>>>>> pre-trained neural network with Caffe library ( >>>>>>>>>> http://caffe.berkeleyvision.org/) . >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> - The second question is about the performance of the >>>>>>>>>> pipeline. Library such as Caffe processes the data in batch and >>>>>>>>>> instancing >>>>>>>>>> one Caffe network can be time consuming when this network is very >>>>>>>>>> deep. So, >>>>>>>>>> we can gain performance if we minimize the number of Caffe >>>>>>>>>> network creation >>>>>>>>>> and give data in batch to the network. In the pipeline, this >>>>>>>>>> corresponds to >>>>>>>>>> run transformers that work on a partition basis and give the whole >>>>>>>>>> partition to a single caffe network. How can we create such a >>>>>>>>>> transformer ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> >>>>>>>>>> Jao >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >