Dear all, I'm still struggling to make a pre-trained caffe model transformer for dataframe works. The main problem is that creating a caffe model inside the UDF is very slow and consumes memories.
Some of you suggest to broadcast the model. The problem with broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset and it is not serializable. Another possible approach is to use a UDF that can handle a whole partitions instead of just a row in order to minimize the caffe model instantiation. Is there any ideas to solve one of these two issues ? Best, Jao On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley <jos...@databricks.com> wrote: > I see. I think your best bet is to create the cnnModel on the master and > then serialize it to send to the workers. If it's big (1M or so), then you > can broadcast it and use the broadcast variable in the UDF. There is not a > great way to do something equivalent to mapPartitions with UDFs right now. > > On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa <jaon...@gmail.com> > wrote: > >> Here is my current implementation with current master version of spark >> >> >> >> >> *class DeepCNNFeature extends Transformer with HasInputCol with >> HasOutputCol ... { override def transformSchema(...) { ... }* >> * override def transform(dataSet: DataFrame, paramMap: ParamMap): >> DataFrame = {* >> >> * transformSchema(dataSet.schema, paramMap, logging = true)* >> >> >> >> * val map = this.paramMap ++ paramMap val >> deepCNNFeature = udf((v: Vector)=> {* >> >> * val cnnModel = new CaffeModel * >> >> * cnnModel.transform(v)* >> >> >> >> >> * } : Vector ) >> dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol))))* >> >> >> * }* >> *}* >> >> where CaffeModel is a java api to Caffe C++ model. >> >> The problem here is that for every row it will create a new instance of >> CaffeModel which is inefficient since creating a new model >> means loading a large model file. And it will transform only a single row >> at a time. Or a Caffe network can process a batch of rows efficiently. >> In other words, is it possible to create an UDF that can operatats on a >> partition in order to minimize the creation of a CaffeModel and >> to take advantage of the Caffe network batch processing ? >> >> >> >> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <jos...@databricks.com> >> wrote: >> >>> I see, thanks for clarifying! >>> >>> I'd recommend following existing implementations in spark.ml >>> transformers. You'll need to define a UDF which operates on a single Row >>> to compute the value for the new column. You can then use the DataFrame >>> DSL to create the new column; the DSL provides a nice syntax for what would >>> otherwise be a SQL statement like "select ... from ...". I'm recommending >>> looking at the existing implementation (rather than stating it here) >>> because it changes between Spark 1.2 and 1.3. In 1.3, the DSL is much >>> improved and makes it easier to create a new column. >>> >>> Joseph >>> >>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <jaon...@gmail.com> >>> wrote: >>> >>>> class DeepCNNFeature extends Transformer ... { >>>> >>>> override def transform(data: DataFrame, paramMap: ParamMap): >>>> DataFrame = { >>>> >>>> >>>> // How can I do a map partition on the underlying RDD >>>> and then add the column ? >>>> >>>> } >>>> } >>>> >>>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa <jaon...@gmail.com> >>>> wrote: >>>> >>>>> Hi Joseph, >>>>> >>>>> Thank your for the tips. I understand what should I do when my data >>>>> are represented as a RDD. The thing that I can't figure out is how to do >>>>> the same thing when the data is view as a DataFrame and I need to add the >>>>> result of my pretrained model as a new column in the DataFrame. >>>>> Preciselly, >>>>> I want to implement the following transformer : >>>>> >>>>> class DeepCNNFeature extends Transformer ... { >>>>> >>>>> } >>>>> >>>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley <jos...@databricks.com> >>>>> wrote: >>>>> >>>>>> Hi Jao, >>>>>> >>>>>> You can use external tools and libraries if they can be called from >>>>>> your Spark program or script (with appropriate conversion of data types, >>>>>> etc.). The best way to apply a pre-trained model to a dataset would be >>>>>> to >>>>>> call the model from within a closure, e.g.: >>>>>> >>>>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) } >>>>>> >>>>>> If your data is distributed in an RDD (myRDD), then the above call >>>>>> will distribute the computation of prediction using the pre-trained >>>>>> model. >>>>>> It will require that all of your Spark workers be able to run the >>>>>> preTrainedModel; that may mean installing Caffe and dependencies on all >>>>>> nodes in the compute cluster. >>>>>> >>>>>> For the second question, I would modify the above call as follows: >>>>>> >>>>>> myRDD.mapPartitions { myDataOnPartition => >>>>>> val myModel = // instantiate neural network on this partition >>>>>> myDataOnPartition.map { myDatum => myModel.predict(myDatum) } >>>>>> } >>>>>> >>>>>> I hope this helps! >>>>>> Joseph >>>>>> >>>>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa < >>>>>> jaon...@gmail.com> wrote: >>>>>> >>>>>>> Dear all, >>>>>>> >>>>>>> >>>>>>> We mainly do large scale computer vision task (image classification, >>>>>>> retrieval, ...). The pipeline is really great stuff for that. We're >>>>>>> trying >>>>>>> to reproduce the tutorial given on that topic during the latest spark >>>>>>> summit ( >>>>>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html >>>>>>> ) >>>>>>> using the master version of spark pipeline and dataframe. The tutorial >>>>>>> shows different examples of feature extraction stages before running >>>>>>> machine learning algorithms. Even the tutorial is straightforward to >>>>>>> reproduce with this new API, we still have some questions : >>>>>>> >>>>>>> - Can one use external tools (e.g via pipe) as a pipeline stage >>>>>>> ? An example of use case is to extract feature learned with >>>>>>> convolutional >>>>>>> neural network. In our case, this corresponds to a pre-trained neural >>>>>>> network with Caffe library (http://caffe.berkeleyvision.org/) . >>>>>>> >>>>>>> >>>>>>> - The second question is about the performance of the pipeline. >>>>>>> Library such as Caffe processes the data in batch and instancing one >>>>>>> Caffe >>>>>>> network can be time consuming when this network is very deep. So, we >>>>>>> can >>>>>>> gain performance if we minimize the number of Caffe network creation >>>>>>> and >>>>>>> give data in batch to the network. In the pipeline, this corresponds >>>>>>> to run >>>>>>> transformers that work on a partition basis and give the whole >>>>>>> partition to >>>>>>> a single caffe network. How can we create such a transformer ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> Jao >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >