Re: How to access objects declared and initialized outside the call() method of JavaRDD
It might kind of work, but you are effectively making all of your workers into mini, separate Spark drivers in their own right. This might cause snags down the line as this isn't the normal thing to do. On Tue, Oct 28, 2014 at 12:11 AM, Localhost shell universal.localh...@gmail.com wrote: Hey lordjoe, Apologies for the late reply. I followed your threadlocal approach and it worked fine. I will update the thread if I get to know more on this. (Don't know how Spark Scala does it but what I wanted to achieve in java is quiet common in many spark-scala github gists) Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to access objects declared and initialized outside the call() method of JavaRDD
Hey lordjoe, Apologies for the late reply. I followed your threadlocal approach and it worked fine. I will update the thread if I get to know more on this. (Don't know how Spark Scala does it but what I wanted to achieve in java is quiet common in many spark-scala github gists) Thanks. On Thu, Oct 23, 2014 at 3:08 PM, lordjoe lordjoe2...@gmail.com wrote: What I have been doing is building a JavaSparkContext the first time it is needed and keeping it as a ThreadLocal - All my code uses SparkUtilities.getCurrentContext(). On a Slave machine you build a new context and don't have to serialize it The code is in a large project at https://code.google.com/p/distributed-tools/ - a work in progress but the Spark aficionados on this list will say if the approach is Kosher public class SparkUtilities extends Serializable private transient static ThreadLocalJavaSparkContext threadContext; private static String appName = Anonymous; public static String getAppName() { return appName; } public static void setAppName(final String pAppName) { appName = pAppName; } /** * create a JavaSparkContext for the thread if none exists * * @return */ public static synchronized JavaSparkContext getCurrentContext() { if (threadContext == null) threadContext = new ThreadLocalJavaSparkContext(); JavaSparkContext ret = threadContext.get(); if (ret != null) return ret; SparkConf sparkConf = new SparkConf().setAppName(getAppName()); // Here do operations you would do to initialize a context ret = new JavaSparkContext(sparkConf); threadContext.set(ret); return ret; } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-objects-declared-and-initialized-outside-the-call-method-of-JavaRDD-tp17094p17150.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- --Unilocal
Re: How to access objects declared and initialized outside the call() method of JavaRDD
In Java, javaSparkContext would have to be declared final in order for it to be accessed inside an inner class like this. But this would still not work as the context is not serializable. You should rewrite this so you are not attempting to use the Spark context inside an RDD. On Thu, Oct 23, 2014 at 8:46 AM, Localhost shell universal.localh...@gmail.com wrote: Hey All, I am unable to access objects declared and initialized outside the call() method of JavaRDD. In the below code snippet, call() method makes a fetch call to C* but since javaSparkContext is defined outside the call method scope so compiler give a compilation error. stringRdd.foreach(new VoidFunctionString() { @Override public void call(String str) throws Exception { JavaRDDString vals = javaFunctions(javaSparkContext).cassandraTable(schema, table, String.class) .select(val); } }); In other languages I have used closure to do this but not able to achieve the same here. Can someone suggest how to achieve this in the current code context? --Unilocal - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to access objects declared and initialized outside the call() method of JavaRDD
Bang On Sean Before sending the issue mail, I was able to remove the compilation error by making it final but then got the Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext (As you mentioned) Now regarding your suggestion of changing the business logic, 1. *Is the current approach possible if I write the code in Scala ?* I think probably not but wanted to check with you. 2. Brief steps of what the code is doing: 1. Get raw sessions data from datatsore (C*) 2. Process the raw sessions data 3. Iterate over the processed data(derive from #2) and fetch the previously aggregated data from store for those rowkeys Add the values from this batch to previous batch values 4. Save back the updated values * This github gist might explain you more https://gist.github.com/rssvihla/6577359860858ccb0b33 https://gist.github.com/rssvihla/6577359860858ccb0b33 and it does a similar thing in scala.* I am trying to achieve a similar thing in Java using Spark Batch with C* as the datastore. I have attached the java code file to provide you some code details. (If I was not able to explain you the problem so the code will be handy) The reason why I am fetching only selective data (that I will update later) because Cassanbdra doesn't provide range queries so I thought fetching complete data might be expensive. It will be great if you can share ur thoughts. On Thu, Oct 23, 2014 at 1:48 AM, Sean Owen so...@cloudera.com wrote: In Java, javaSparkContext would have to be declared final in order for it to be accessed inside an inner class like this. But this would still not work as the context is not serializable. You should rewrite this so you are not attempting to use the Spark context inside an RDD. On Thu, Oct 23, 2014 at 8:46 AM, Localhost shell universal.localh...@gmail.com wrote: Hey All, I am unable to access objects declared and initialized outside the call() method of JavaRDD. In the below code snippet, call() method makes a fetch call to C* but since javaSparkContext is defined outside the call method scope so compiler give a compilation error. stringRdd.foreach(new VoidFunctionString() { @Override public void call(String str) throws Exception { JavaRDDString vals = javaFunctions(javaSparkContext).cassandraTable(schema, table, String.class) .select(val); } }); In other languages I have used closure to do this but not able to achieve the same here. Can someone suggest how to achieve this in the current code context? --Unilocal -- --Unilocal PocAppNew.java Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to access objects declared and initialized outside the call() method of JavaRDD
+1 to Sean. Is it possible to rewrite your code to not use SparkContext in RDD. Or why does javaFunctions() need the SparkContext. On Thu, Oct 23, 2014 at 10:53 AM, Localhost shell universal.localh...@gmail.com wrote: Bang On Sean Before sending the issue mail, I was able to remove the compilation error by making it final but then got the Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext (As you mentioned) Now regarding your suggestion of changing the business logic, 1. *Is the current approach possible if I write the code in Scala ?* I think probably not but wanted to check with you. 2. Brief steps of what the code is doing: 1. Get raw sessions data from datatsore (C*) 2. Process the raw sessions data 3. Iterate over the processed data(derive from #2) and fetch the previously aggregated data from store for those rowkeys Add the values from this batch to previous batch values 4. Save back the updated values * This github gist might explain you more https://gist.github.com/rssvihla/6577359860858ccb0b33 https://gist.github.com/rssvihla/6577359860858ccb0b33 and it does a similar thing in scala.* I am trying to achieve a similar thing in Java using Spark Batch with C* as the datastore. I have attached the java code file to provide you some code details. (If I was not able to explain you the problem so the code will be handy) The reason why I am fetching only selective data (that I will update later) because Cassanbdra doesn't provide range queries so I thought fetching complete data might be expensive. It will be great if you can share ur thoughts. On Thu, Oct 23, 2014 at 1:48 AM, Sean Owen so...@cloudera.com wrote: In Java, javaSparkContext would have to be declared final in order for it to be accessed inside an inner class like this. But this would still not work as the context is not serializable. You should rewrite this so you are not attempting to use the Spark context inside an RDD. On Thu, Oct 23, 2014 at 8:46 AM, Localhost shell universal.localh...@gmail.com wrote: Hey All, I am unable to access objects declared and initialized outside the call() method of JavaRDD. In the below code snippet, call() method makes a fetch call to C* but since javaSparkContext is defined outside the call method scope so compiler give a compilation error. stringRdd.foreach(new VoidFunctionString() { @Override public void call(String str) throws Exception { JavaRDDString vals = javaFunctions(javaSparkContext).cassandraTable(schema, table, String.class) .select(val); } }); In other languages I have used closure to do this but not able to achieve the same here. Can someone suggest how to achieve this in the current code context? --Unilocal -- --Unilocal - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to access objects declared and initialized outside the call() method of JavaRDD
Hey Jayant, In my previous mail, I have mentioned a github gist *https://gist.github.com/rssvihla/6577359860858ccb0b33 https://gist.github.com/rssvihla/6577359860858ccb0b33 *which is doing very similar to what I want to do but its using scala language for spark. Hence my question (reiterating from previous mail): *Is the current approach possible if I write the code in Scala?* Why does javaFunctions() need the SparkContext? Because per row in the RDD, I am making a get call to the data store 'cassandra'. The reason why I am fetching only selective data (that I will update later) because Cassandra doesn't provide range queries so I thought fetching complete data might be expensive. On Thu, Oct 23, 2014 at 11:22 AM, Jayant Shekhar jay...@cloudera.com wrote: +1 to Sean. Is it possible to rewrite your code to not use SparkContext in RDD. Or why does javaFunctions() need the SparkContext. On Thu, Oct 23, 2014 at 10:53 AM, Localhost shell universal.localh...@gmail.com wrote: Bang On Sean Before sending the issue mail, I was able to remove the compilation error by making it final but then got the Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext (As you mentioned) Now regarding your suggestion of changing the business logic, 1. *Is the current approach possible if I write the code in Scala ?* I think probably not but wanted to check with you. 2. Brief steps of what the code is doing: 1. Get raw sessions data from datatsore (C*) 2. Process the raw sessions data 3. Iterate over the processed data(derive from #2) and fetch the previously aggregated data from store for those rowkeys Add the values from this batch to previous batch values 4. Save back the updated values * This github gist might explain you more https://gist.github.com/rssvihla/6577359860858ccb0b33 https://gist.github.com/rssvihla/6577359860858ccb0b33 and it does a similar thing in scala.* I am trying to achieve a similar thing in Java using Spark Batch with C* as the datastore. I have attached the java code file to provide you some code details. (If I was not able to explain you the problem so the code will be handy) The reason why I am fetching only selective data (that I will update later) because Cassanbdra doesn't provide range queries so I thought fetching complete data might be expensive. It will be great if you can share ur thoughts. On Thu, Oct 23, 2014 at 1:48 AM, Sean Owen so...@cloudera.com wrote: In Java, javaSparkContext would have to be declared final in order for it to be accessed inside an inner class like this. But this would still not work as the context is not serializable. You should rewrite this so you are not attempting to use the Spark context inside an RDD. On Thu, Oct 23, 2014 at 8:46 AM, Localhost shell universal.localh...@gmail.com wrote: Hey All, I am unable to access objects declared and initialized outside the call() method of JavaRDD. In the below code snippet, call() method makes a fetch call to C* but since javaSparkContext is defined outside the call method scope so compiler give a compilation error. stringRdd.foreach(new VoidFunctionString() { @Override public void call(String str) throws Exception { JavaRDDString vals = javaFunctions(javaSparkContext).cassandraTable(schema, table, String.class) .select(val); } }); In other languages I have used closure to do this but not able to achieve the same here. Can someone suggest how to achieve this in the current code context? --Unilocal -- --Unilocal - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- --Unilocal
Re: How to access objects declared and initialized outside the call() method of JavaRDD
What I have been doing is building a JavaSparkContext the first time it is needed and keeping it as a ThreadLocal - All my code uses SparkUtilities.getCurrentContext(). On a Slave machine you build a new context and don't have to serialize it The code is in a large project at https://code.google.com/p/distributed-tools/ - a work in progress but the Spark aficionados on this list will say if the approach is Kosher public class SparkUtilities extends Serializable private transient static ThreadLocalJavaSparkContext threadContext; private static String appName = Anonymous; public static String getAppName() { return appName; } public static void setAppName(final String pAppName) { appName = pAppName; } /** * create a JavaSparkContext for the thread if none exists * * @return */ public static synchronized JavaSparkContext getCurrentContext() { if (threadContext == null) threadContext = new ThreadLocalJavaSparkContext(); JavaSparkContext ret = threadContext.get(); if (ret != null) return ret; SparkConf sparkConf = new SparkConf().setAppName(getAppName()); // Here do operations you would do to initialize a context ret = new JavaSparkContext(sparkConf); threadContext.set(ret); return ret; } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-objects-declared-and-initialized-outside-the-call-method-of-JavaRDD-tp17094p17150.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org