One workaround could be to convert a DataFrame into a RDD inside the transform function and then use mapPartitions/broadcast to work with the JNI calls and then convert back to RDD.
Thanks Shivaram On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa <jaon...@gmail.com> wrote: > Dear all, > > I'm still struggling to make a pre-trained caffe model transformer for > dataframe works. The main problem is that creating a caffe model inside the > UDF is very slow and consumes memories. > > Some of you suggest to broadcast the model. The problem with broadcasting > is that I use a JNI interface to caffe C++ with javacpp-preset and it is > not serializable. > > Another possible approach is to use a UDF that can handle a whole > partitions instead of just a row in order to minimize the caffe model > instantiation. > > Is there any ideas to solve one of these two issues ? > > > > Best, > > Jao > > On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley <jos...@databricks.com> > wrote: > >> I see. I think your best bet is to create the cnnModel on the master and >> then serialize it to send to the workers. If it's big (1M or so), then you >> can broadcast it and use the broadcast variable in the UDF. There is not a >> great way to do something equivalent to mapPartitions with UDFs right now. >> >> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa <jaon...@gmail.com> >> wrote: >> >>> Here is my current implementation with current master version of spark >>> >>> >>> >>> >>> *class DeepCNNFeature extends Transformer with HasInputCol with >>> HasOutputCol ... { override def transformSchema(...) { ... }* >>> * override def transform(dataSet: DataFrame, paramMap: ParamMap): >>> DataFrame = {* >>> >>> * transformSchema(dataSet.schema, paramMap, logging = >>> true)* >>> >>> >>> >>> * val map = this.paramMap ++ paramMap val >>> deepCNNFeature = udf((v: Vector)=> {* >>> >>> * val cnnModel = new CaffeModel * >>> >>> * cnnModel.transform(v)* >>> >>> >>> >>> >>> * } : Vector ) >>> dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol))))* >>> >>> >>> * }* >>> *}* >>> >>> where CaffeModel is a java api to Caffe C++ model. >>> >>> The problem here is that for every row it will create a new instance of >>> CaffeModel which is inefficient since creating a new model >>> means loading a large model file. And it will transform only a single >>> row at a time. Or a Caffe network can process a batch of rows efficiently. >>> In other words, is it possible to create an UDF that can operatats on a >>> partition in order to minimize the creation of a CaffeModel and >>> to take advantage of the Caffe network batch processing ? >>> >>> >>> >>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <jos...@databricks.com> >>> wrote: >>> >>>> I see, thanks for clarifying! >>>> >>>> I'd recommend following existing implementations in spark.ml >>>> transformers. You'll need to define a UDF which operates on a single Row >>>> to compute the value for the new column. You can then use the DataFrame >>>> DSL to create the new column; the DSL provides a nice syntax for what would >>>> otherwise be a SQL statement like "select ... from ...". I'm recommending >>>> looking at the existing implementation (rather than stating it here) >>>> because it changes between Spark 1.2 and 1.3. In 1.3, the DSL is much >>>> improved and makes it easier to create a new column. >>>> >>>> Joseph >>>> >>>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <jaon...@gmail.com> >>>> wrote: >>>> >>>>> class DeepCNNFeature extends Transformer ... { >>>>> >>>>> override def transform(data: DataFrame, paramMap: ParamMap): >>>>> DataFrame = { >>>>> >>>>> >>>>> // How can I do a map partition on the underlying RDD >>>>> and then add the column ? >>>>> >>>>> } >>>>> } >>>>> >>>>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa <jaon...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Joseph, >>>>>> >>>>>> Thank your for the tips. I understand what should I do when my data >>>>>> are represented as a RDD. The thing that I can't figure out is how to do >>>>>> the same thing when the data is view as a DataFrame and I need to add the >>>>>> result of my pretrained model as a new column in the DataFrame. >>>>>> Preciselly, >>>>>> I want to implement the following transformer : >>>>>> >>>>>> class DeepCNNFeature extends Transformer ... { >>>>>> >>>>>> } >>>>>> >>>>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley <jos...@databricks.com >>>>>> > wrote: >>>>>> >>>>>>> Hi Jao, >>>>>>> >>>>>>> You can use external tools and libraries if they can be called from >>>>>>> your Spark program or script (with appropriate conversion of data types, >>>>>>> etc.). The best way to apply a pre-trained model to a dataset would be >>>>>>> to >>>>>>> call the model from within a closure, e.g.: >>>>>>> >>>>>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) } >>>>>>> >>>>>>> If your data is distributed in an RDD (myRDD), then the above call >>>>>>> will distribute the computation of prediction using the pre-trained >>>>>>> model. >>>>>>> It will require that all of your Spark workers be able to run the >>>>>>> preTrainedModel; that may mean installing Caffe and dependencies on all >>>>>>> nodes in the compute cluster. >>>>>>> >>>>>>> For the second question, I would modify the above call as follows: >>>>>>> >>>>>>> myRDD.mapPartitions { myDataOnPartition => >>>>>>> val myModel = // instantiate neural network on this partition >>>>>>> myDataOnPartition.map { myDatum => myModel.predict(myDatum) } >>>>>>> } >>>>>>> >>>>>>> I hope this helps! >>>>>>> Joseph >>>>>>> >>>>>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa < >>>>>>> jaon...@gmail.com> wrote: >>>>>>> >>>>>>>> Dear all, >>>>>>>> >>>>>>>> >>>>>>>> We mainly do large scale computer vision task (image >>>>>>>> classification, retrieval, ...). The pipeline is really great stuff for >>>>>>>> that. We're trying to reproduce the tutorial given on that topic >>>>>>>> during the >>>>>>>> latest spark summit ( >>>>>>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html >>>>>>>> ) >>>>>>>> using the master version of spark pipeline and dataframe. The tutorial >>>>>>>> shows different examples of feature extraction stages before running >>>>>>>> machine learning algorithms. Even the tutorial is straightforward to >>>>>>>> reproduce with this new API, we still have some questions : >>>>>>>> >>>>>>>> - Can one use external tools (e.g via pipe) as a pipeline stage >>>>>>>> ? An example of use case is to extract feature learned with >>>>>>>> convolutional >>>>>>>> neural network. In our case, this corresponds to a pre-trained >>>>>>>> neural >>>>>>>> network with Caffe library (http://caffe.berkeleyvision.org/) . >>>>>>>> >>>>>>>> >>>>>>>> - The second question is about the performance of the pipeline. >>>>>>>> Library such as Caffe processes the data in batch and instancing >>>>>>>> one Caffe >>>>>>>> network can be time consuming when this network is very deep. So, >>>>>>>> we can >>>>>>>> gain performance if we minimize the number of Caffe network >>>>>>>> creation and >>>>>>>> give data in batch to the network. In the pipeline, this >>>>>>>> corresponds to run >>>>>>>> transformers that work on a partition basis and give the whole >>>>>>>> partition to >>>>>>>> a single caffe network. How can we create such a transformer ? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Best, >>>>>>>> >>>>>>>> Jao >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >