I made such naive implementation:

SparkConf conf =newSparkConf();
conf.setMaster("local[4]").setAppName("Stub");
finalJavaSparkContext ctx =newJavaSparkContext(conf);

JavaRDD<String> input = ctx.textFile("path_to_file");

// explode each line into list of column values
JavaRDD<List<String>> rowValues = input.map(newFunction<String, List<String>>() 
{
    @Override
    publicList<String> call(String v1)throwsException {
        returnLists.newArrayList(v1.split(";"));
    }
});

// transform input to key(word, colNumber), value => 1
JavaPairRDD<Tuple2<String, Integer>, Integer> positions = 
rowValues.flatMapToPair(newPairFlatMapFunction<List<String>, Tuple2<String, Integer>, 
Integer>() {
    @Override
    publicIterable<Tuple2<Tuple2<String, Integer>, Integer>> call(List<String> 
strings)throwsException {
        List<Tuple2<Tuple2<String, Integer>, Integer>> retval =newArrayList<>();

        intcolNum = -1;
        for(String word : strings) {
            Tuple2<String, Integer> wordPosition =newTuple2<>(word, ++colNum);
            retval.add(newTuple2<>(wordPosition,1));
        }
        returnretval;
    }
});

// summ word counts within column
JavaPairRDD<Tuple2<String, Integer>, Integer> summ = 
positions.reduceByKey(newFunction2<Integer, Integer, Integer>() {
    @Override
    publicInteger call(Integer v1, Integer v2)throwsException {
        returnv1 + v2;
    }
});

// invert position - make columnNumber key, and word+count - key
JavaPairRDD<Integer, Tuple2<String, Integer>> columnIsKey = 
summ.mapToPair(newPairFunction<Tuple2<Tuple2<String, Integer>, Integer>, Integer, Tuple2<String, 
Integer>>() {
    @Override
    publicTuple2<Integer, Tuple2<String, Integer>> call(Tuple2<Tuple2<String, 
Integer>, Integer> tuple2IntegerTuple2)throwsException {
        return 
newTuple2<>(tuple2IntegerTuple2._1()._2(),newTuple2<>(tuple2IntegerTuple2._1()._1(),
 tuple2IntegerTuple2._2()));
    }
});

// here some optimizations to avoid groupByKey may be implemented
JavaPairRDD<Integer, Iterable<Tuple2<String, Integer>>> groupped = 
columnIsKey.groupByKey();
// output results
groupped.foreach(newVoidFunction<Tuple2<Integer, Iterable<Tuple2<String, 
Integer>>>>() {
    @Override
    public voidcall(Tuple2<Integer, Iterable<Tuple2<String, Integer>>> 
integerIterableTuple2)throwsException {
        String strValues ="";
        Iterable<Tuple2<String, Integer>> values = integerIterableTuple2._2();
        for(Tuple2<String,Integer> distinct : values) {
            strValues +="("+distinct._1()+","+ distinct._2() +")";
        }
        System.out.println("Column: "+ integerIterableTuple2._1());
        System.out.println("Distincts: "+ strValues);
    }
});



On 29.07.2015 16:38, Devi P.V wrote:
Hi All,

I have a 5GB CSV dataset having 69 columns..I need to find the count of distinct values in each column. What is the optimized way to find the same using spark scala?

Example CSV format :

a,b,c,d
a,c,b,a
b,b,c,d
b,b,c,a
c,b,b,a

Output expecting :

(a,2),(b,2),(c,1) #- First column distinct count
(b,4),(c,1)       #- Second column distinct count
(c,3),(b,2)       #- Third column distinct count
(d,2),(a,3)       #- Fourth column distinct count


Thanks in Advance

Reply via email to