Hi, Can anybody help me on this please, haven't been able to find the problem :(
Thanks. On Nov 15, 2014 4:48 PM, "Bahubali Jain" <bahub...@gmail.com> wrote: > Hi, > Trying to use spark streaming, but I am struggling with word count :( > I want consolidate output of the word count (not on a per window basis), > so I am using updateStateByKey(), but for some reason this is not working. > The function it self is not being invoked(do not see the sysout output on > console). > > > public final class WordCount { > private static final Pattern SPACE = Pattern.compile(" "); > > public static void main(String[] args) { > if (args.length < 2) { > System.err.println("Usage: JavaNetworkWordCount <hostname> > <port>"); > System.exit(1); > } > > // Create the context with a 1 second batch size > SparkConf sparkConf = new > SparkConf().setAppName("JavaNetworkWordCount"); > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, > new Duration(1000)); > ssc.checkpoint("/tmp/worcount"); > // Create a JavaReceiverInputDStream on target ip:port and count > the > // words in input stream of \n delimited text (eg. generated by > 'nc') > // Note that no duplication in storage level only for running > locally. > // Replication necessary in distributed scenario for fault > tolerance. > JavaReceiverInputDStream<String> lines = ssc.socketTextStream( > args[0], Integer.parseInt(args[1]), > StorageLevels.MEMORY_AND_DISK_SER); > JavaDStream<String> words = lines.flatMap(new > FlatMapFunction<String, String>() { > @Override > public Iterable<String> call(String x) { > return Lists.newArrayList(SPACE.split(x)); > } > }); > > JavaPairDStream<String, Integer> wordCounts = words.mapToPair( > new PairFunction<String, String, Integer>() { > @Override > public Tuple2<String, Integer> call(String s) { > System.err.println("Got "+s); > return new Tuple2<String, Integer>(s, 1); > } > }).reduceByKey(new Function2<Integer, Integer, Integer>() { > @Override > public Integer call(Integer i1, Integer i2) { > return i1 + i2; > } > }); > > wordCounts.print(); > > * wordCounts.updateStateByKey(new updateFunction());* > ssc.start(); > ssc.awaitTermination(); > } > } > > class updateFunction implements Function2<List<Integer>, > Optional<Integer>, Optional<Integer>> > { > > @Override public Optional<Integer> call(List<Integer> values, > Optional<Integer> state) { > > Integer x = new Integer(0); > for (Integer i:values) > x = x+i; > Integer newSum = state.or(0)+x; // add the new values with the > previous running count to get the new count > System.out.println("Newsum is "+newSum); > return Optional.of(newSum); > > }; > > } >