My guess is that the `createDataFrame` call is failing here.  Can you check
if the schema being passed to it includes the column name and type for the
newly being zipped `features` ?

Joseph probably knows this better, but AFAIK the DenseVector here will need
to be marked as a VectorUDT while creating a DataFrame column

Thanks
Shivaram

On Tue, Mar 31, 2015 at 12:50 AM, Jaonary Rabarisoa <jaon...@gmail.com>
wrote:

> Following your suggestion, I end up with the following implementation :
>
>
>
>
>
>
>
> *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = 
> {  val schema = transformSchema(dataSet.schema, paramMap, logging = true)  
> val map = this.paramMap ++ paramMap*
>
>
>
>
>
>
>
>
>
>
>
>
>
> *val features = dataSet.select(map(inputCol)).mapPartitions { rows =>    
> Caffe.set_mode(Caffe.CPU)    val net = 
> CaffeUtils.floatTestNetwork(SparkFiles.get(topology), SparkFiles.get(weight)) 
>    val inputBlobs: FloatBlobVector = net.input_blobs()    val N: Int = 1    
> val K: Int = inputBlobs.get(0).channels()    val H: Int = 
> inputBlobs.get(0).height()    val W: Int = inputBlobs.get(0).width()    
> inputBlobs.get(0).Reshape(N, K, H, W)    val dataBlob = new 
> FloatPointer(N*K*W*H)*
>     val inputCPUData = inputBlobs.get(0).mutable_cpu_data()
>
>     val feat = rows.map { case Row(a: Iterable[Float])=>
>       dataBlob.put(a.toArray, 0, a.size)
>       caffe_copy_float(N*K*W*H, dataBlob, inputCPUData)
>       val resultBlobs: FloatBlobVector = net.ForwardPrefilled()
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *      val resultDim = resultBlobs.get(0).channels()      logInfo(s"Output 
> dimension $resultDim")      val resultBlobData = 
> resultBlobs.get(0).cpu_data()      val output = new Array[Float](resultDim)   
>    resultBlobData.get(output)      Vectors.dense(output.map(_.toDouble))    } 
>    //net.deallocate()    feat  }  val newRowData = 
> dataSet.rdd.zip(features).map { case (old, feat)=>    val oldSeq = old.toSeq  
>     Row.fromSeq(oldSeq :+ feat)  }  
> dataSet.sqlContext.createDataFrame(newRowData, schema)}*
>
>
> The idea is to mapPartitions of the underlying RDD of the DataFrame and
> create a new DataFrame by zipping the results. It seems to work but when I
> try to save the RDD I got the following error :
>
> org.apache.spark.mllib.linalg.DenseVector cannot be cast to
> org.apache.spark.sql.Row
>
>
> On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman <
> shiva...@eecs.berkeley.edu> wrote:
>
>> One workaround could be to convert a DataFrame into a RDD inside the
>> transform function and then use mapPartitions/broadcast to work with the
>> JNI calls and then convert back to RDD.
>>
>> Thanks
>> Shivaram
>>
>> On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa <jaon...@gmail.com>
>> wrote:
>>
>>> Dear all,
>>>
>>> I'm still struggling to make a pre-trained caffe model transformer for
>>> dataframe works. The main problem is that creating a caffe model inside the
>>> UDF is very slow and consumes memories.
>>>
>>> Some of you suggest to broadcast the model. The problem with
>>> broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset
>>>  and it is not serializable.
>>>
>>> Another possible approach is to use a UDF that can handle a whole
>>> partitions instead of just a row in order to minimize the caffe model
>>> instantiation.
>>>
>>> Is there any ideas to solve one of these two issues ?
>>>
>>>
>>>
>>> Best,
>>>
>>> Jao
>>>
>>> On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley <jos...@databricks.com>
>>> wrote:
>>>
>>>> I see.  I think your best bet is to create the cnnModel on the master
>>>> and then serialize it to send to the workers.  If it's big (1M or so), then
>>>> you can broadcast it and use the broadcast variable in the UDF.  There is
>>>> not a great way to do something equivalent to mapPartitions with UDFs right
>>>> now.
>>>>
>>>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa <jaon...@gmail.com>
>>>> wrote:
>>>>
>>>>> Here is my current implementation with current master version of spark
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *class DeepCNNFeature extends Transformer with HasInputCol with
>>>>> HasOutputCol ... {   override def transformSchema(...) { ... }*
>>>>> *    override def transform(dataSet: DataFrame, paramMap: ParamMap):
>>>>> DataFrame = {*
>>>>>
>>>>> *                  transformSchema(dataSet.schema, paramMap, logging = 
>>>>> true)*
>>>>>
>>>>>
>>>>>
>>>>> *                  val map = this.paramMap ++ paramMap                  
>>>>> val deepCNNFeature = udf((v: Vector)=> {*
>>>>>
>>>>> *                              val cnnModel = new CaffeModel *
>>>>>
>>>>> *                              cnnModel.transform(v)*
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *                  } : Vector )                  
>>>>> dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol))))*
>>>>>
>>>>>
>>>>> *     }*
>>>>> *}*
>>>>>
>>>>> where CaffeModel is a java api to Caffe C++ model.
>>>>>
>>>>> The problem here is that for every row it will create a new instance
>>>>> of CaffeModel which is inefficient since creating a new model
>>>>> means loading a large model file. And it will transform only a single
>>>>> row at a time. Or a Caffe network can process a batch of rows efficiently.
>>>>> In other words, is it possible to create an UDF that can operatats on a
>>>>> partition in order to minimize the creation of a CaffeModel and
>>>>> to take advantage of the Caffe network batch processing ?
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <jos...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> I see, thanks for clarifying!
>>>>>>
>>>>>> I'd recommend following existing implementations in spark.ml
>>>>>> transformers.  You'll need to define a UDF which operates on a single Row
>>>>>> to compute the value for the new column.  You can then use the DataFrame
>>>>>> DSL to create the new column; the DSL provides a nice syntax for what 
>>>>>> would
>>>>>> otherwise be a SQL statement like "select ... from ...".  I'm 
>>>>>> recommending
>>>>>> looking at the existing implementation (rather than stating it here)
>>>>>> because it changes between Spark 1.2 and 1.3.  In 1.3, the DSL is much
>>>>>> improved and makes it easier to create a new column.
>>>>>>
>>>>>> Joseph
>>>>>>
>>>>>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <jaon...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> class DeepCNNFeature extends Transformer ... {
>>>>>>>
>>>>>>>     override def transform(data: DataFrame, paramMap: ParamMap):
>>>>>>> DataFrame = {
>>>>>>>
>>>>>>>
>>>>>>>                  // How can I do a map partition on the underlying
>>>>>>> RDD and then add the column ?
>>>>>>>
>>>>>>>      }
>>>>>>> }
>>>>>>>
>>>>>>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa <
>>>>>>> jaon...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Joseph,
>>>>>>>>
>>>>>>>> Thank your for the tips. I understand what should I do when my data
>>>>>>>> are represented as a RDD. The thing that I can't figure out is how to 
>>>>>>>> do
>>>>>>>> the same thing when the data is view as a DataFrame and I need to add 
>>>>>>>> the
>>>>>>>> result of my pretrained model as a new column in the DataFrame. 
>>>>>>>> Preciselly,
>>>>>>>> I want to implement the following transformer :
>>>>>>>>
>>>>>>>> class DeepCNNFeature extends Transformer ... {
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley <
>>>>>>>> jos...@databricks.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Jao,
>>>>>>>>>
>>>>>>>>> You can use external tools and libraries if they can be called
>>>>>>>>> from your Spark program or script (with appropriate conversion of data
>>>>>>>>> types, etc.).  The best way to apply a pre-trained model to a dataset 
>>>>>>>>> would
>>>>>>>>> be to call the model from within a closure, e.g.:
>>>>>>>>>
>>>>>>>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>>>>>>>>>
>>>>>>>>> If your data is distributed in an RDD (myRDD), then the above call
>>>>>>>>> will distribute the computation of prediction using the pre-trained 
>>>>>>>>> model.
>>>>>>>>> It will require that all of your Spark workers be able to run the
>>>>>>>>> preTrainedModel; that may mean installing Caffe and dependencies on 
>>>>>>>>> all
>>>>>>>>> nodes in the compute cluster.
>>>>>>>>>
>>>>>>>>> For the second question, I would modify the above call as follows:
>>>>>>>>>
>>>>>>>>> myRDD.mapPartitions { myDataOnPartition =>
>>>>>>>>>   val myModel = // instantiate neural network on this partition
>>>>>>>>>   myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> I hope this helps!
>>>>>>>>> Joseph
>>>>>>>>>
>>>>>>>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa <
>>>>>>>>> jaon...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Dear all,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> We mainly do large scale computer vision task (image
>>>>>>>>>> classification, retrieval, ...). The pipeline is really great stuff 
>>>>>>>>>> for
>>>>>>>>>> that. We're trying to reproduce the tutorial given on that topic 
>>>>>>>>>> during the
>>>>>>>>>> latest spark summit (
>>>>>>>>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
>>>>>>>>>>  )
>>>>>>>>>> using the master version of spark pipeline and dataframe. The 
>>>>>>>>>> tutorial
>>>>>>>>>> shows different examples of feature extraction stages before running
>>>>>>>>>> machine learning algorithms. Even the tutorial is straightforward to
>>>>>>>>>> reproduce with this new API, we still have some questions :
>>>>>>>>>>
>>>>>>>>>>    - Can one use external tools (e.g via pipe) as a pipeline
>>>>>>>>>>    stage ? An example of use case is to extract feature learned with
>>>>>>>>>>    convolutional neural network. In our case, this corresponds to a
>>>>>>>>>>    pre-trained neural network with Caffe library (
>>>>>>>>>>    http://caffe.berkeleyvision.org/) .
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>    - The second question is about the performance of the
>>>>>>>>>>    pipeline. Library such as Caffe processes the data in batch and 
>>>>>>>>>> instancing
>>>>>>>>>>    one Caffe network can be time consuming when this network is very 
>>>>>>>>>> deep. So,
>>>>>>>>>>    we can gain performance if we minimize the number of Caffe 
>>>>>>>>>> network creation
>>>>>>>>>>    and give data in batch to the network. In the pipeline, this 
>>>>>>>>>> corresponds to
>>>>>>>>>>    run transformers that work on a partition basis and give the whole
>>>>>>>>>>    partition to a single caffe network. How can we create such a 
>>>>>>>>>> transformer ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>>
>>>>>>>>>> Jao
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to