Just for the record, this is being discussed at StackOverflow: http://stackoverflow.com/questions/25663026/developing-a-spark-streaming-application/25766618
2014-08-27 10:28 GMT+02:00 Filip Andrei <andreis.fi...@gmail.com>: > Hey guys, so the problem i'm trying to tackle is the following: > > - I need a data source that emits messages at a certain frequency > - There are N neural nets that need to process each message individually > - The outputs from all neural nets are aggregated and only when all N > outputs for each message are collected, should a message be declared fully > processed > - At the end i should measure the time it took for a message to be fully > processed (time between when it was emitted and when all N neural net > outputs from that message have been collected) > > > What i'm mostly interested in is if i approached the problem correctly in > the first place and if so some best practice pointers on my approach. > > > > > > > And my current implementation if the following: > > > For a data source i created the class > public class JavaRandomReceiver extends Receiver<Map<String, > Object>> > > As i decided a key-value store would be best suited to holding emitted > data. > > > The onStart() method initializes a custom random sequence generator and > starts a thread that > continuously generates new neural net inputs and stores them as following: > > SensorData sdata = generator.createSensorData(); > > Map<String, Object> result = new HashMap<String, Object>(); > > result.put("msgNo", sdata.getMsgNo()); > result.put("sensorTime", sdata.getSampleTime()); > result.put("list", sdata.getPayload()); > result.put("timeOfProc", sdata.getCreationTime()); > > store(result); > > // sleeps for a given amount of time set at generator creation > generator.waitForNextTuple(); > > The msgNo here is incremented for each newly created message and is used to > keep > > > The neural net functionality is added by creating a custom mapper > public class NeuralNetMapper implements Function<Map<String, > Object>, > Map<String, Object>> > > whose call function basically just takes the input map, plugs its "list" > object as the input to the neural net object, replaces the map's initial > list with the neural net output and returns the modified map. > > > > > The aggregator is implemented as a single class that has the following form > > public class JavaSyncBarrier implements > Function<JavaRDD<Map<String,Object>>, Void> > > > > This class maintains a google guava cache of neural net outputs that it has > received in the form of > <Long, List<Map<String, Object>>>, where the Long value is the msgNo > and the list contains all maps containing said message number. > > When a new map is received, it is added to the cache, its list's length is > compared to to the total number of neural nets and, if these numbers match, > that message number is said to be fully processed and a difference between > timeOfProc (all maps with the same msgNo have the same timeOfProc) and the > current system time is displayed as the total time necessary for > processing. > > > > > > Now the way all these components are linked together is the following: > > public static void main(String[] args) { > > > SparkConf conf = new SparkConf(); > conf.setAppName("SimpleSparkStreamingTest"); > > > JavaStreamingContext jssc = new JavaStreamingContext(conf, new > Duration(1000)); > > jssc.checkpoint("/tmp/spark-tempdir"); > > // Generator config goes here > // Set to emit new message every 1 second > // --- > > // Neural net config goes here > // --- > > JavaReceiverInputDStream<Map<String, Object>> rndLists = jssc > .receiverStream(new JavaRandomReceiver(generatorConfig); > > List<JavaDStream<Map<String, Object>>> > neuralNetOutputStreams = new > ArrayList<JavaDStream<Map<String, Object>>>(); > > for(int i = 0; i < numberOfNets; i++){ > > neuralNetOutputStreams .add( > rndLists.map(new NeuralNetMapper(neuralNetConfig)) > ); > } > > JavaDStream<Map<String, Object>> joined = > joinStreams(neuralNetOutputs); > > joined.foreach(new JavaSyncBarrier(numberOfNets)); > > jssc.start(); > jssc.awaitTermination(); > } > > where joinStreams unifies a list of streams: > public static <T> JavaDStream<T> > joinStreams(List<JavaDStream<T>> > streams) { > > JavaDStream<T> result = streams.get(0); > for (int i = 1; i < streams.size(); i++) { > result = result.union(streams.get(i)); > } > > return result; > } > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-tp12893.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Santiago M. Mola Avenida de Europa, 26. Ática 5. 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: 91 352 59 42 // @stratiobd