This sample below is essentially word count modified to be big data by
turning lines into groups of
upper case letters and then generating all case variants - it is modeled
after some real problems in biology
The issue is I know how to do this in Hadoop but in Spark the use of a List
in my flatmap function will not work as the size grows but I don't know
what will or how not to keep data in memory
Anyone want to look at the sample and tell me how

on my machine given 8g it does
Variant Size 18 Size 14188672 took 406 sec and
stalls with larger cases
==========================================
import org.apache.spark.*;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.*;
import scala.*;

import java.util.*;

/**
 * com.lordjoe.distributed.test.JavaBigDataWordCount
 * This sample is written to force a sample with large amounts of data
emulating some big data aspects of
 * a problem in biology I am working on -
 * <p/>
 * This is essentially WordCount
 * except that lines are filtered to just upper case words
 * then broken into String groups and all varients with different case are
generated
 * so THE -> THE,ThE,THe,The,tHE,thE,tHe,the
 * when the groups get long - say 10 or 20 a LOT of variants are generated
 * <p/>
 * This sample is motivated by real problems in biology where we want to
look at possible mutations in DNA fragments or
 * possible chemical modifications on amino acids in polypeptides - my
largest Hadoop job does exactly that
 * <p/>
 * I have serious questions about
 * A - How to write the FlatMapFunction CaseVariationFunction as the output
gets large - I think storing results in a List will not work
 * - what are other options
 * B are there other ways to do this
 */
public final class JavaBigDataWordCount {

    /**
     * drop all characters that are not letters
     *
     * @param s input string
     * @return output string
     */
    public static String dropNonLetters(String s) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < s.length(); i++) {
            char c = s.charAt(i);
            if (Character.isLetter(c))
                sb.append(c);
        }

        return sb.toString();
    }

    /**
     * convert a string into a string holding only upper case letters
     *
     * @param inp input string
     * @return output string
     */
    public static String regularizeString(String inp) {
        inp = inp.trim();
        inp = inp.toUpperCase();
        return dropNonLetters(inp);
    }

    /**
     * convert a string into strings of length maxLength all letters and
     * upper case
     */
    public static class SubstringsMapFunction implements
FlatMapFunction<String, String> {
        private final int maxLength;

        public SubstringsMapFunction(final int pMaxLength) {
            maxLength = pMaxLength;
        }

        public Iterable<String> call(String s) {
            s = regularizeString(s); // drop non-letters
            List<String> holder = new ArrayList<String>();
            for (int i = 0; i < s.length() - 2; i += maxLength) {
                holder.add(s.substring(i, Math.min(s.length(), i +
maxLength)));
            }
            return holder;
        }
    }

    /**
     * return all cases of an upper case string so THE ->
THE,ThE,THe,The,tHE,thE,tHe,the
     * In general the output is 2 to the Nth long where N is the input
length
     */
    public static class CaseVariationFunction implements
FlatMapFunction<String, String> {
        public Iterable<String> call(String s) {
            // HELP - I don't think a List will work for long inputs given
WHAT else can I use
            List<String> holder = new ArrayList<String>();  // holds
variants
            holder.add(s);
            makeVariations(s.toCharArray(), holder, 0);  // do real work
filling in holder
            return holder;
        }

        /**
         * add to holder - NOTE I think a List is wrong for large inputs
         *
         * @param chars  characters input
         * @param holder - holder - or iterable holding results
         * @param index  - start changing case at this position
         */
        private void makeVariations(char[] chars, final List<String>
holder, int index) {
            if (index < chars.length - 1)
                makeVariations(chars, holder, index + 1);
            if (Character.isUpperCase(chars[index])) {
                chars[index] = Character.toLowerCase(chars[index]);
                holder.add(new String(chars));
                if (index < chars.length - 1)
                    makeVariations(chars, holder, index + 1);
                chars[index] = Character.toUpperCase(chars[index]);
            }

        }
    }

    // a few lines of text so we don't need to read a file is we don't want
to
    public static final String GETTYSBURG =
            "Four score and seven years ago our fathers brought forth, upon
this continent, a new nation, conceived in liberty," +
                    " and dedicated to the proposition that “all men are
created equal.”\n" +
                    "Now we are engaged in a great civil war, testing
whether that nation, or any nation so conceived," +
                    " and so dedicated, can long endure. We are met on a
great battle field of that war. We come to dedicate a portion of it," +
                    " as a final resting place for those who died here,
that the nation might live. This we may, in all propriety do.\n" +
                    "But, in a larger sense, we can not dedicate – we can
not consecrate – we can not hallow, this ground – The brave men, living and
dead, who struggled here, have hallowed it, far above our poor power to add
or detract. The world will little note, nor long remember what we say here;
while it can never forget what they did here.\n" +
                    "It is rather for us, the living, we here be dedicated
to the great task remaining before us – that, from these honored dead" +
                    " we take increased devotion to that cause for which
they here, gave the last full measure of devotion – that we here highly
resolve" +
                    " these dead shall not have died in vain; that the
nation, shall have a new birth of freedom, and that government of the
people, by" +
                    " the people, for the people, shall not perish from the
earth.\n";


    /**
     * Main - runs essentially word count for all variants starting with
length 10 (1000 variants) and
     * raising it by a factor of 4
     *
     * @param args if given a file to read - otherwise use the Gettysburg
address
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        SparkConf sparkConf = new
SparkConf().setAppName("JavaBigDataWordCount");
        sparkConf.set("spark.mesos.coarse", "true");
        Option<String> option = sparkConf.getOption("spark.master");
        if (!option.isDefined())   // use local over nothing
            sparkConf.setMaster("local");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);

        String[] linesTXT = GETTYSBURG.split("\n"); // the gettysburg
address as lines of text
        JavaRDD<String> lines = ctx.parallelize(Arrays.asList(linesTXT));
        lines = lines.persist(StorageLevel.MEMORY_ONLY());

        // try for variant counts 10..32 by 2 - Each case is 4 times the
size
        for (int variantSize = 10; variantSize < 32; variantSize += 2) {
            long startMSec = System.currentTimeMillis();  // when did we
start
            // Drop all non-letters - make upper case split into groups of
size  variantSize
            JavaRDD<String> words = lines.flatMap(new
JavaBigDataWordCount.SubstringsMapFunction(variantSize));

            // generate all variants with different case  should be  THE ->
THE,ThE,THe,The,tHE,tHe,the,ThE,The
            JavaRDD<String> variants = words.flatMap(new
JavaBigDataWordCount.CaseVariationFunction());

            // same as Java word count - we actually expect all counts to
be 1
            JavaPairRDD<String, Integer> ones = variants.mapToPair(new
PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) {
                    return new Tuple2<String, Integer>(s, 1);
                }
            });

            // same as Java word count - we actually expect all counts to
be 1
            JavaPairRDD<String, Integer> counts = ones.reduceByKey(new
Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer i1, Integer i2) {
                    return i1 + i2;
                }
            });
            List<Tuple2<String, Integer>> output =
counts.sortByKey().collect();
            long endMSec = System.currentTimeMillis();   // when did we
finish

            System.out.println("Variant Size " + variantSize + " Size " +
output.size() + " took " + (int) (endMSec - startMSec) / 1000);
        }
    }
}
==========================================

Reply via email to