In my transformSchema I do specify that the output column type is a VectorUDT :






*override def transformSchema(schema: StructType, paramMap: ParamMap):
StructType = {  val map = this.paramMap ++ paramMap
checkInputColumn(schema, map(inputCol), ArrayType(FloatType, false))
addOutputColumn(schema, map(outputCol), new VectorUDT)}*


The output of printSchema is as follow :

*|-- cnnFeature: vecto (nullable = false)*



On Tue, Mar 31, 2015 at 9:55 AM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

> My guess is that the `createDataFrame` call is failing here.  Can you
> check if the schema being passed to it includes the column name and type
> for the newly being zipped `features` ?
>
> Joseph probably knows this better, but AFAIK the DenseVector here will
> need to be marked as a VectorUDT while creating a DataFrame column
>
> Thanks
> Shivaram
>
> On Tue, Mar 31, 2015 at 12:50 AM, Jaonary Rabarisoa <jaon...@gmail.com>
> wrote:
>
>> Following your suggestion, I end up with the following implementation :
>>
>>
>>
>>
>>
>>
>>
>> *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = 
>> {  val schema = transformSchema(dataSet.schema, paramMap, logging = true)  
>> val map = this.paramMap ++ paramMap*
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *val features = dataSet.select(map(inputCol)).mapPartitions { rows =>    
>> Caffe.set_mode(Caffe.CPU)    val net = 
>> CaffeUtils.floatTestNetwork(SparkFiles.get(topology), 
>> SparkFiles.get(weight))    val inputBlobs: FloatBlobVector = 
>> net.input_blobs()    val N: Int = 1    val K: Int = 
>> inputBlobs.get(0).channels()    val H: Int = inputBlobs.get(0).height()    
>> val W: Int = inputBlobs.get(0).width()    inputBlobs.get(0).Reshape(N, K, H, 
>> W)    val dataBlob = new FloatPointer(N*K*W*H)*
>>     val inputCPUData = inputBlobs.get(0).mutable_cpu_data()
>>
>>     val feat = rows.map { case Row(a: Iterable[Float])=>
>>       dataBlob.put(a.toArray, 0, a.size)
>>       caffe_copy_float(N*K*W*H, dataBlob, inputCPUData)
>>       val resultBlobs: FloatBlobVector = net.ForwardPrefilled()
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *      val resultDim = resultBlobs.get(0).channels()      logInfo(s"Output 
>> dimension $resultDim")      val resultBlobData = 
>> resultBlobs.get(0).cpu_data()      val output = new Array[Float](resultDim)  
>>     resultBlobData.get(output)      Vectors.dense(output.map(_.toDouble))    
>> }    //net.deallocate()    feat  }  val newRowData = 
>> dataSet.rdd.zip(features).map { case (old, feat)=>    val oldSeq = old.toSeq 
>>      Row.fromSeq(oldSeq :+ feat)  }  
>> dataSet.sqlContext.createDataFrame(newRowData, schema)}*
>>
>>
>> The idea is to mapPartitions of the underlying RDD of the DataFrame and
>> create a new DataFrame by zipping the results. It seems to work but when I
>> try to save the RDD I got the following error :
>>
>> org.apache.spark.mllib.linalg.DenseVector cannot be cast to
>> org.apache.spark.sql.Row
>>
>>
>> On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman <
>> shiva...@eecs.berkeley.edu> wrote:
>>
>>> One workaround could be to convert a DataFrame into a RDD inside the
>>> transform function and then use mapPartitions/broadcast to work with the
>>> JNI calls and then convert back to RDD.
>>>
>>> Thanks
>>> Shivaram
>>>
>>> On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa <jaon...@gmail.com>
>>> wrote:
>>>
>>>> Dear all,
>>>>
>>>> I'm still struggling to make a pre-trained caffe model transformer for
>>>> dataframe works. The main problem is that creating a caffe model inside the
>>>> UDF is very slow and consumes memories.
>>>>
>>>> Some of you suggest to broadcast the model. The problem with
>>>> broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset
>>>>  and it is not serializable.
>>>>
>>>> Another possible approach is to use a UDF that can handle a whole
>>>> partitions instead of just a row in order to minimize the caffe model
>>>> instantiation.
>>>>
>>>> Is there any ideas to solve one of these two issues ?
>>>>
>>>>
>>>>
>>>> Best,
>>>>
>>>> Jao
>>>>
>>>> On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley <jos...@databricks.com>
>>>> wrote:
>>>>
>>>>> I see.  I think your best bet is to create the cnnModel on the master
>>>>> and then serialize it to send to the workers.  If it's big (1M or so), 
>>>>> then
>>>>> you can broadcast it and use the broadcast variable in the UDF.  There is
>>>>> not a great way to do something equivalent to mapPartitions with UDFs 
>>>>> right
>>>>> now.
>>>>>
>>>>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa <jaon...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Here is my current implementation with current master version of
>>>>>> spark
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *class DeepCNNFeature extends Transformer with HasInputCol with
>>>>>> HasOutputCol ... {   override def transformSchema(...) { ... }*
>>>>>> *    override def transform(dataSet: DataFrame, paramMap: ParamMap):
>>>>>> DataFrame = {*
>>>>>>
>>>>>> *                  transformSchema(dataSet.schema, paramMap, logging = 
>>>>>> true)*
>>>>>>
>>>>>>
>>>>>>
>>>>>> *                  val map = this.paramMap ++ paramMap                  
>>>>>> val deepCNNFeature = udf((v: Vector)=> {*
>>>>>>
>>>>>> *                              val cnnModel = new CaffeModel *
>>>>>>
>>>>>> *                              cnnModel.transform(v)*
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *                  } : Vector )                  
>>>>>> dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol))))*
>>>>>>
>>>>>>
>>>>>> *     }*
>>>>>> *}*
>>>>>>
>>>>>> where CaffeModel is a java api to Caffe C++ model.
>>>>>>
>>>>>> The problem here is that for every row it will create a new instance
>>>>>> of CaffeModel which is inefficient since creating a new model
>>>>>> means loading a large model file. And it will transform only a single
>>>>>> row at a time. Or a Caffe network can process a batch of rows 
>>>>>> efficiently.
>>>>>> In other words, is it possible to create an UDF that can operatats on a
>>>>>> partition in order to minimize the creation of a CaffeModel and
>>>>>> to take advantage of the Caffe network batch processing ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <jos...@databricks.com
>>>>>> > wrote:
>>>>>>
>>>>>>> I see, thanks for clarifying!
>>>>>>>
>>>>>>> I'd recommend following existing implementations in spark.ml
>>>>>>> transformers.  You'll need to define a UDF which operates on a single 
>>>>>>> Row
>>>>>>> to compute the value for the new column.  You can then use the DataFrame
>>>>>>> DSL to create the new column; the DSL provides a nice syntax for what 
>>>>>>> would
>>>>>>> otherwise be a SQL statement like "select ... from ...".  I'm 
>>>>>>> recommending
>>>>>>> looking at the existing implementation (rather than stating it here)
>>>>>>> because it changes between Spark 1.2 and 1.3.  In 1.3, the DSL is much
>>>>>>> improved and makes it easier to create a new column.
>>>>>>>
>>>>>>> Joseph
>>>>>>>
>>>>>>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <jaon...@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> class DeepCNNFeature extends Transformer ... {
>>>>>>>>
>>>>>>>>     override def transform(data: DataFrame, paramMap: ParamMap):
>>>>>>>> DataFrame = {
>>>>>>>>
>>>>>>>>
>>>>>>>>                  // How can I do a map partition on the underlying
>>>>>>>> RDD and then add the column ?
>>>>>>>>
>>>>>>>>      }
>>>>>>>> }
>>>>>>>>
>>>>>>>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa <
>>>>>>>> jaon...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Joseph,
>>>>>>>>>
>>>>>>>>> Thank your for the tips. I understand what should I do when my
>>>>>>>>> data are represented as a RDD. The thing that I can't figure out is 
>>>>>>>>> how to
>>>>>>>>> do the same thing when the data is view as a DataFrame and I need to 
>>>>>>>>> add
>>>>>>>>> the result of my pretrained model as a new column in the DataFrame.
>>>>>>>>> Preciselly, I want to implement the following transformer :
>>>>>>>>>
>>>>>>>>> class DeepCNNFeature extends Transformer ... {
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley <
>>>>>>>>> jos...@databricks.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Jao,
>>>>>>>>>>
>>>>>>>>>> You can use external tools and libraries if they can be called
>>>>>>>>>> from your Spark program or script (with appropriate conversion of 
>>>>>>>>>> data
>>>>>>>>>> types, etc.).  The best way to apply a pre-trained model to a 
>>>>>>>>>> dataset would
>>>>>>>>>> be to call the model from within a closure, e.g.:
>>>>>>>>>>
>>>>>>>>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>>>>>>>>>>
>>>>>>>>>> If your data is distributed in an RDD (myRDD), then the above
>>>>>>>>>> call will distribute the computation of prediction using the 
>>>>>>>>>> pre-trained
>>>>>>>>>> model.  It will require that all of your Spark workers be able to 
>>>>>>>>>> run the
>>>>>>>>>> preTrainedModel; that may mean installing Caffe and dependencies on 
>>>>>>>>>> all
>>>>>>>>>> nodes in the compute cluster.
>>>>>>>>>>
>>>>>>>>>> For the second question, I would modify the above call as follows:
>>>>>>>>>>
>>>>>>>>>> myRDD.mapPartitions { myDataOnPartition =>
>>>>>>>>>>   val myModel = // instantiate neural network on this partition
>>>>>>>>>>   myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> I hope this helps!
>>>>>>>>>> Joseph
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa <
>>>>>>>>>> jaon...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Dear all,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> We mainly do large scale computer vision task (image
>>>>>>>>>>> classification, retrieval, ...). The pipeline is really great stuff 
>>>>>>>>>>> for
>>>>>>>>>>> that. We're trying to reproduce the tutorial given on that topic 
>>>>>>>>>>> during the
>>>>>>>>>>> latest spark summit (
>>>>>>>>>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
>>>>>>>>>>>  )
>>>>>>>>>>> using the master version of spark pipeline and dataframe. The 
>>>>>>>>>>> tutorial
>>>>>>>>>>> shows different examples of feature extraction stages before running
>>>>>>>>>>> machine learning algorithms. Even the tutorial is straightforward to
>>>>>>>>>>> reproduce with this new API, we still have some questions :
>>>>>>>>>>>
>>>>>>>>>>>    - Can one use external tools (e.g via pipe) as a pipeline
>>>>>>>>>>>    stage ? An example of use case is to extract feature learned with
>>>>>>>>>>>    convolutional neural network. In our case, this corresponds to a
>>>>>>>>>>>    pre-trained neural network with Caffe library (
>>>>>>>>>>>    http://caffe.berkeleyvision.org/) .
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>    - The second question is about the performance of the
>>>>>>>>>>>    pipeline. Library such as Caffe processes the data in batch and 
>>>>>>>>>>> instancing
>>>>>>>>>>>    one Caffe network can be time consuming when this network is 
>>>>>>>>>>> very deep. So,
>>>>>>>>>>>    we can gain performance if we minimize the number of Caffe 
>>>>>>>>>>> network creation
>>>>>>>>>>>    and give data in batch to the network. In the pipeline, this 
>>>>>>>>>>> corresponds to
>>>>>>>>>>>    run transformers that work on a partition basis and give the 
>>>>>>>>>>> whole
>>>>>>>>>>>    partition to a single caffe network. How can we create such a 
>>>>>>>>>>> transformer ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>>
>>>>>>>>>>> Jao
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to