One workaround could be to convert a DataFrame into a RDD inside the
transform function and then use mapPartitions/broadcast to work with the
JNI calls and then convert back to RDD.

Thanks
Shivaram

On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa <jaon...@gmail.com>
wrote:

> Dear all,
>
> I'm still struggling to make a pre-trained caffe model transformer for
> dataframe works. The main problem is that creating a caffe model inside the
> UDF is very slow and consumes memories.
>
> Some of you suggest to broadcast the model. The problem with broadcasting
> is that I use a JNI interface to caffe C++ with javacpp-preset  and it is
> not serializable.
>
> Another possible approach is to use a UDF that can handle a whole
> partitions instead of just a row in order to minimize the caffe model
> instantiation.
>
> Is there any ideas to solve one of these two issues ?
>
>
>
> Best,
>
> Jao
>
> On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley <jos...@databricks.com>
> wrote:
>
>> I see.  I think your best bet is to create the cnnModel on the master and
>> then serialize it to send to the workers.  If it's big (1M or so), then you
>> can broadcast it and use the broadcast variable in the UDF.  There is not a
>> great way to do something equivalent to mapPartitions with UDFs right now.
>>
>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa <jaon...@gmail.com>
>> wrote:
>>
>>> Here is my current implementation with current master version of spark
>>>
>>>
>>>
>>>
>>> *class DeepCNNFeature extends Transformer with HasInputCol with
>>> HasOutputCol ... {   override def transformSchema(...) { ... }*
>>> *    override def transform(dataSet: DataFrame, paramMap: ParamMap):
>>> DataFrame = {*
>>>
>>> *                  transformSchema(dataSet.schema, paramMap, logging = 
>>> true)*
>>>
>>>
>>>
>>> *                  val map = this.paramMap ++ paramMap                  val 
>>> deepCNNFeature = udf((v: Vector)=> {*
>>>
>>> *                              val cnnModel = new CaffeModel *
>>>
>>> *                              cnnModel.transform(v)*
>>>
>>>
>>>
>>>
>>> *                  } : Vector )                  
>>> dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol))))*
>>>
>>>
>>> *     }*
>>> *}*
>>>
>>> where CaffeModel is a java api to Caffe C++ model.
>>>
>>> The problem here is that for every row it will create a new instance of
>>> CaffeModel which is inefficient since creating a new model
>>> means loading a large model file. And it will transform only a single
>>> row at a time. Or a Caffe network can process a batch of rows efficiently.
>>> In other words, is it possible to create an UDF that can operatats on a
>>> partition in order to minimize the creation of a CaffeModel and
>>> to take advantage of the Caffe network batch processing ?
>>>
>>>
>>>
>>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <jos...@databricks.com>
>>> wrote:
>>>
>>>> I see, thanks for clarifying!
>>>>
>>>> I'd recommend following existing implementations in spark.ml
>>>> transformers.  You'll need to define a UDF which operates on a single Row
>>>> to compute the value for the new column.  You can then use the DataFrame
>>>> DSL to create the new column; the DSL provides a nice syntax for what would
>>>> otherwise be a SQL statement like "select ... from ...".  I'm recommending
>>>> looking at the existing implementation (rather than stating it here)
>>>> because it changes between Spark 1.2 and 1.3.  In 1.3, the DSL is much
>>>> improved and makes it easier to create a new column.
>>>>
>>>> Joseph
>>>>
>>>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <jaon...@gmail.com>
>>>> wrote:
>>>>
>>>>> class DeepCNNFeature extends Transformer ... {
>>>>>
>>>>>     override def transform(data: DataFrame, paramMap: ParamMap):
>>>>> DataFrame = {
>>>>>
>>>>>
>>>>>                  // How can I do a map partition on the underlying RDD
>>>>> and then add the column ?
>>>>>
>>>>>      }
>>>>> }
>>>>>
>>>>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa <jaon...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Joseph,
>>>>>>
>>>>>> Thank your for the tips. I understand what should I do when my data
>>>>>> are represented as a RDD. The thing that I can't figure out is how to do
>>>>>> the same thing when the data is view as a DataFrame and I need to add the
>>>>>> result of my pretrained model as a new column in the DataFrame. 
>>>>>> Preciselly,
>>>>>> I want to implement the following transformer :
>>>>>>
>>>>>> class DeepCNNFeature extends Transformer ... {
>>>>>>
>>>>>> }
>>>>>>
>>>>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley <jos...@databricks.com
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi Jao,
>>>>>>>
>>>>>>> You can use external tools and libraries if they can be called from
>>>>>>> your Spark program or script (with appropriate conversion of data types,
>>>>>>> etc.).  The best way to apply a pre-trained model to a dataset would be 
>>>>>>> to
>>>>>>> call the model from within a closure, e.g.:
>>>>>>>
>>>>>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>>>>>>>
>>>>>>> If your data is distributed in an RDD (myRDD), then the above call
>>>>>>> will distribute the computation of prediction using the pre-trained 
>>>>>>> model.
>>>>>>> It will require that all of your Spark workers be able to run the
>>>>>>> preTrainedModel; that may mean installing Caffe and dependencies on all
>>>>>>> nodes in the compute cluster.
>>>>>>>
>>>>>>> For the second question, I would modify the above call as follows:
>>>>>>>
>>>>>>> myRDD.mapPartitions { myDataOnPartition =>
>>>>>>>   val myModel = // instantiate neural network on this partition
>>>>>>>   myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
>>>>>>> }
>>>>>>>
>>>>>>> I hope this helps!
>>>>>>> Joseph
>>>>>>>
>>>>>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa <
>>>>>>> jaon...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Dear all,
>>>>>>>>
>>>>>>>>
>>>>>>>> We mainly do large scale computer vision task (image
>>>>>>>> classification, retrieval, ...). The pipeline is really great stuff for
>>>>>>>> that. We're trying to reproduce the tutorial given on that topic 
>>>>>>>> during the
>>>>>>>> latest spark summit (
>>>>>>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
>>>>>>>>  )
>>>>>>>> using the master version of spark pipeline and dataframe. The tutorial
>>>>>>>> shows different examples of feature extraction stages before running
>>>>>>>> machine learning algorithms. Even the tutorial is straightforward to
>>>>>>>> reproduce with this new API, we still have some questions :
>>>>>>>>
>>>>>>>>    - Can one use external tools (e.g via pipe) as a pipeline stage
>>>>>>>>    ? An example of use case is to extract feature learned with 
>>>>>>>> convolutional
>>>>>>>>    neural network. In our case, this corresponds to a pre-trained 
>>>>>>>> neural
>>>>>>>>    network with Caffe library (http://caffe.berkeleyvision.org/) .
>>>>>>>>
>>>>>>>>
>>>>>>>>    - The second question is about the performance of the pipeline.
>>>>>>>>    Library such as Caffe processes the data in batch and instancing 
>>>>>>>> one Caffe
>>>>>>>>    network can be time consuming when this network is very deep. So, 
>>>>>>>> we can
>>>>>>>>    gain performance if we minimize the number of Caffe network 
>>>>>>>> creation and
>>>>>>>>    give data in batch to the network. In the pipeline, this 
>>>>>>>> corresponds to run
>>>>>>>>    transformers that work on a partition basis and give the whole 
>>>>>>>> partition to
>>>>>>>>    a single caffe network. How can we create such a transformer ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Jao
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to