Thank you Jakob. I will try as suggested. Regards, Shashi
On Fri, Sep 23, 2016 at 12:14 AM, Jakob Odersky <ja...@odersky.com> wrote: > Hi Shashikant, > > I think you are trying to do too much at once in your helper class. > Spark's RDD API is functional, it is meant to be used by writing many > little transformations that will be distributed across a cluster. > > Appart from that, `rdd.pipe` seems like a good approach. Here is the > relevant doc comment (in RDD.scala) on how to use it: > > Return an RDD created by piping elements to a forked external > process. The resulting RDD > * is computed by executing the given process once per partition. All > elements > * of each input partition are written to a process's stdin as lines > of input separated > * by a newline. The resulting partition consists of the process's > stdout output, with > * each line of stdout resulting in one element of the output > partition. A process is invoked > * even for empty partitions. > * > * [...] > Check the full docs here > http://spark.apache.org/docs/latest/api/scala/index.html# > org.apache.spark.rdd.RDD@pipe(command:String):org.apache. > spark.rdd.RDD[String] > > This is how you could use it: > > productRDD=//get from cassandra > processedRDD=productsRDD.map(STEP1).map(STEP2).pipe(C binary of step > 3) > STEP4 //store processed RDD > > hope this gives you some pointers, > > best, > --Jakob > > > > > On Thu, Sep 22, 2016 at 2:10 AM, Shashikant Kulkarni (शशिकांत > कुलकर्णी) <shashikant.kulka...@gmail.com> wrote: > > Hello Jakob, > > > > Thanks for replying. Here is a short example of what I am trying. Taking > an > > example of Product column family in Cassandra just for explaining my > > requirement > > > > In Driver.java > > { > > JavaRDD<Product> productsRdd = Get Products from Cassandra; > > productsRdd.map(ProductHelper.processProduct()); > > } > > > > in ProductHelper.java > > { > > > > public static Function<Product, Boolean> processProduct() { > > return new Function< Product, Boolean>(){ > > private static final long serialVersionUID = 1L; > > > > @Override > > public Boolean call(Product product) throws Exception { > > //STEP 1: Doing some processing on product object. > > //STEP 2: Now using few values of product, I need to create a string like > > "name id sku datetime" > > //STEP 3: Pass this string to my C binary file to perform some complex > > calculations and return some data > > //STEP 4: Get the return data and store it back in Cassandra DB > > } > > }; > > } > > } > > > > In this ProductHelper, I cannot pass and don't want to pass sparkContext > > object as app will throw error of "task not serializable". If there is a > way > > let me know. > > > > Now I am not able to achieve STEP 3 above. How can I pass a String to C > > binary and get the output back in my program. The C binary reads data > from > > STDIN and outputs data to STDOUT. It is working from other part of > > application from PHP. I want to reuse the same C binary in my Apache > SPARK > > application for some background processing and analysis using > JavaRDD.pipe() > > API. If there is any other way let me know. This code will be executed in > > all the nodes in a cluster. > > > > Hope my requirement is now clear. How to do this? > > > > Regards, > > Shash > > > > On Thu, Sep 22, 2016 at 4:13 AM, Jakob Odersky <ja...@odersky.com> > wrote: > >> > >> Can you provide more details? It's unclear what you're asking > >> > >> On Wed, Sep 21, 2016 at 10:14 AM, shashikant.kulka...@gmail.com > >> <shashikant.kulka...@gmail.com> wrote: > >> > Hi All, > >> > > >> > I am trying to use the JavaRDD.pipe() API. > >> > > >> > I have one object with me from the JavaRDD > > > > >