Annnnd... it's NOT working.

Here's the code:

        val bytes = kafkaStream.map({ case (key, messageBytes) =>
messageBytes}) // Map to just get the bytes part out...
        val things = bytes.flatMap(bytesArrayToThings) // convert to a thing
        val srcDestinations = things.map(thing =>
(ipToString(thing.getSourceIp), Set(ipToString(thing.getDestinationIp))))
// up to this point works fine.

// this fails to print
        val srcDestinationSets = srcDestinations.reduceByKey((exist:
Set[String], addl: Set[String]) => exist ++ addl)


What it does...

>From a kafka message containing many "things", convert the message to an
array of said "things", flatMap them out to a stream of 1 "thing" at a
time, pull out and make a Tuple of a (SourceIP, DestinationIP).

ALL THAT WORKS.  If I do a "srcDestinations.print()" I get output like the
following, every 5 seconds, which is my batch size.

-------------------------------------------
Time: 1402582000000 ms
-------------------------------------------
(10.30.51.216,Set(10.20.1.1))
(10.20.11.3,Set(10.10.61.98))
(10.20.11.3,Set(10.10.61.95))
...



What I want is a SET of (sourceIP -> Set(all the destination Ips))  Using a
set because as you can see above, the same source may have the same
destination multiple times and I want to eliminate dupes on the destination
side.

When I call the reduceByKey() method, I never get any output.  When I do a
"srcDestinationSets.print()"  NOTHING EVER PRINTS.  Ever.  Never.

What am I doing wrong?  (The same happens for "reduceByKeyAndWindow(...,
Seconds(5))".)

I'm sure this is something I've done, but I cannot figure out what it was.

Help, please?

Reply via email to