Re: Bug in Accumulators...
I posted several examples in java at http://lordjoesoftware.blogspot.com/ Generally code like this works and I show how to accumulate more complex values. // Make two accumulators using Statistics final Accumulator totalLetters= ctx.accumulator(0L, "ttl"); JavaRDD lines = ... JavaRDD words = lines.flatMap(new FlatMapFunction() { @Override public Iterable call(final String s) throws Exception { // Handle accumulator here totalLetters.add(s.length()); // count all letters }); Long numberCalls = totalCounts.value(); I believe the mistake is to pass the accumulator to the function rather than letting the function find the accumulator - I do this in this case by using a final local variable -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19579.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ReduceByKey but with different functions depending on key
Map the key value into a key,Tuple2 and process that - Also ask the Spark maintainers for a version of keyed operations where the key is passed in as an argument - I run into these cases all the time /** * map a tuple int a key tuple pair to insure subsequent processing has access to both Key and value * @param inp input pair RDD * @paramkey type * @paramvalue type * @return output where value has both key and value */ @Nonnull public static JavaPairRDD> toKeyedTuples(@Nonnull JavaPairRDD< K, V> inp) { return inp.flatMapToPair(new PairFlatMapFunction, K, Tuple2>() { @Override public Iterable>> call(final Tuple2 t) throws Exception { return new Tuple2>>(t._1(),new Tuple2(t._1(),t._2()); } }); } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ReduceByKey-but-with-different-functions-depending-on-key-tp19177p19198.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: RDD.aggregate versus accumulables...
I have been playing with using accumulators (despite the possible error with multiple attempts) These provide a convenient way to get some numbers while still performing business logic. I posted some sample code at http://lordjoesoftware.blogspot.com/. Even if accumulators are not perfect today - future versions may improve them and they are great ways to monitor execution and get a sense of performance on lazily executed systems -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-aggregate-versus-accumulables-tp19044p19102.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: disable log4j for spark-shell
public static void main(String[] args) throws Exception { System.out.println("Set Log to Warn"); Logger rootLogger = Logger.getRootLogger(); rootLogger.setLevel(Level.WARN); ... works for me -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/disable-log4j-for-spark-shell-tp11278p18535.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.NotSerializableException: org.apache.spark.SparkEnv
I posted on this issue in http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-objects-declared-and-initialized-outside-the-call-method-of-JavaRDD-td17094.html#a17150 Code starts public class SparkUtilities extends Serializable private transient static ThreadLocal threadContext; private static String appName = "Anonymous"; essentially you need to get a context on the slave machine saving it in a transient (non serialized) field - at least that is what you want to do in Java -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-NotSerializableException-org-apache-spark-SparkEnv-tp10641p18072.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to access objects declared and initialized outside the call() method of JavaRDD
What I have been doing is building a JavaSparkContext the first time it is needed and keeping it as a ThreadLocal - All my code uses SparkUtilities.getCurrentContext(). On a Slave machine you build a new context and don't have to serialize it The code is in a large project at https://code.google.com/p/distributed-tools/ - a work in progress but the Spark aficionados on this list will say if the approach is Kosher public class SparkUtilities extends Serializable private transient static ThreadLocal threadContext; private static String appName = "Anonymous"; public static String getAppName() { return appName; } public static void setAppName(final String pAppName) { appName = pAppName; } /** * create a JavaSparkContext for the thread if none exists * * @return */ public static synchronized JavaSparkContext getCurrentContext() { if (threadContext == null) threadContext = new ThreadLocal(); JavaSparkContext ret = threadContext.get(); if (ret != null) return ret; SparkConf sparkConf = new SparkConf().setAppName(getAppName()); // Here do operations you would do to initialize a context ret = new JavaSparkContext(sparkConf); threadContext.set(ret); return ret; } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-objects-declared-and-initialized-outside-the-call-method-of-JavaRDD-tp17094p17150.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to calculate percentiles with Spark?
A rather more general question is - assume I have an JavaRDD which is sorted - How can I convert this into a JavaPairRDD where the Integer is tie index -> 0...N - 1. Easy to do on one machine JavaRDD values = ... // create here JavaRDD positions = values.mapToPair(new PairFunction() { private int index = 0; @Override public Tuple2 call(final K t) throws Exception { return new Tuple2(index++,t); } }); but will this code do the right thing on a cluster -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentiles-with-Spark-tp16937p16945.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org