I made such naive implementation:
SparkConf conf =newSparkConf();
conf.setMaster("local[4]").setAppName("Stub");
finalJavaSparkContext ctx =newJavaSparkContext(conf);
JavaRDD<String> input = ctx.textFile("path_to_file");
// explode each line into list of column values
JavaRDD<List<String>> rowValues = input.map(newFunction<String, List<String>>()
{
@Override
publicList<String> call(String v1)throwsException {
returnLists.newArrayList(v1.split(";"));
}
});
// transform input to key(word, colNumber), value => 1
JavaPairRDD<Tuple2<String, Integer>, Integer> positions =
rowValues.flatMapToPair(newPairFlatMapFunction<List<String>, Tuple2<String, Integer>,
Integer>() {
@Override
publicIterable<Tuple2<Tuple2<String, Integer>, Integer>> call(List<String>
strings)throwsException {
List<Tuple2<Tuple2<String, Integer>, Integer>> retval =newArrayList<>();
intcolNum = -1;
for(String word : strings) {
Tuple2<String, Integer> wordPosition =newTuple2<>(word, ++colNum);
retval.add(newTuple2<>(wordPosition,1));
}
returnretval;
}
});
// summ word counts within column
JavaPairRDD<Tuple2<String, Integer>, Integer> summ =
positions.reduceByKey(newFunction2<Integer, Integer, Integer>() {
@Override
publicInteger call(Integer v1, Integer v2)throwsException {
returnv1 + v2;
}
});
// invert position - make columnNumber key, and word+count - key
JavaPairRDD<Integer, Tuple2<String, Integer>> columnIsKey =
summ.mapToPair(newPairFunction<Tuple2<Tuple2<String, Integer>, Integer>, Integer, Tuple2<String,
Integer>>() {
@Override
publicTuple2<Integer, Tuple2<String, Integer>> call(Tuple2<Tuple2<String,
Integer>, Integer> tuple2IntegerTuple2)throwsException {
return
newTuple2<>(tuple2IntegerTuple2._1()._2(),newTuple2<>(tuple2IntegerTuple2._1()._1(),
tuple2IntegerTuple2._2()));
}
});
// here some optimizations to avoid groupByKey may be implemented
JavaPairRDD<Integer, Iterable<Tuple2<String, Integer>>> groupped =
columnIsKey.groupByKey();
// output results
groupped.foreach(newVoidFunction<Tuple2<Integer, Iterable<Tuple2<String,
Integer>>>>() {
@Override
public voidcall(Tuple2<Integer, Iterable<Tuple2<String, Integer>>>
integerIterableTuple2)throwsException {
String strValues ="";
Iterable<Tuple2<String, Integer>> values = integerIterableTuple2._2();
for(Tuple2<String,Integer> distinct : values) {
strValues +="("+distinct._1()+","+ distinct._2() +")";
}
System.out.println("Column: "+ integerIterableTuple2._1());
System.out.println("Distincts: "+ strValues);
}
});
On 29.07.2015 16:38, Devi P.V wrote:
Hi All,
I have a 5GB CSV dataset having 69 columns..I need to find the count
of distinct values in each column. What is the optimized way to find
the same using spark scala?
Example CSV format :
a,b,c,d
a,c,b,a
b,b,c,d
b,b,c,a
c,b,b,a
Output expecting :
(a,2),(b,2),(c,1) #- First column distinct count
(b,4),(c,1) #- Second column distinct count
(c,3),(b,2) #- Third column distinct count
(d,2),(a,3) #- Fourth column distinct count
Thanks in Advance