Hi,
Trying to use spark streaming, but I am struggling with word count :(
I want consolidate output of the word count (not on a per window basis), so
I am using updateStateByKey(), but for some reason this is not working.
The function it self is not being invoked(do not see the sysout output on
console).


public final class WordCount {
  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) {
        if (args.length < 2) {
          System.err.println("Usage: JavaNetworkWordCount <hostname>
<port>");
          System.exit(1);
        }

         // Create the context with a 1 second batch size
        SparkConf sparkConf = new
SparkConf().setAppName("JavaNetworkWordCount");
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,  new
Duration(1000));
        ssc.checkpoint("/tmp/worcount");
        // Create a JavaReceiverInputDStream on target ip:port and count the
        // words in input stream of \n delimited text (eg. generated by
'nc')
        // Note that no duplication in storage level only for running
locally.
        // Replication necessary in distributed scenario for fault
tolerance.
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
                args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
        JavaDStream<String> words = lines.flatMap(new
FlatMapFunction<String, String>() {
          @Override
          public Iterable<String> call(String x) {
            return Lists.newArrayList(SPACE.split(x));
          }
        });

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
          new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) {
                System.err.println("Got "+s);
              return new Tuple2<String, Integer>(s, 1);
            }
          }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) {
              return i1 + i2;
            }
          });

        wordCounts.print();

*        wordCounts.updateStateByKey(new updateFunction());*
 ssc.start();
        ssc.awaitTermination();
  }
}

class updateFunction implements Function2<List<Integer>, Optional<Integer>,
Optional<Integer>>
{

      @Override public Optional<Integer> call(List<Integer> values,
Optional<Integer> state) {

         Integer x = new Integer(0);
         for (Integer i:values)
             x = x+i;
        Integer newSum = state.or(0)+x;  // add the new values with the
previous running count to get the new count
        System.out.println("Newsum is "+newSum);
        return Optional.of(newSum);

      };

}

Reply via email to