Re: Spark structured streaming - efficient way to do lots of aggregations on the same input files
Hi, I don't have any code for the forEachBatch approach, I mentioned it due to this response to my question on SO: https://stackoverflow.com/a/65803718/1017130 I have added some very simple code below that I think shows what I'm trying to do: val schema = StructType( Array( StructField("senderId1", LongType), StructField("senderId2", LongType), StructField("destId1", LongType), StructField("eventType", IntegerType) StructField("cost", LongType) ) ) val fileStreamDf = spark.readStream.schema(schema).option("delimiter", "\t").csv("D:\\SparkTest") fileStreamDf.createOrReplaceTempView("myTable") spark.sql("SELECT senderId1, count(*) AS num_events FROM myTable GROUP BY senderId1 HAVING count(*) > 1").writeStream.format("console").outputMode("complete").start() spark.sql("SELECT senderId2, sum(cost) AS total_cost FROM myTable WHERE eventType = 3 GROUP BY senderId2 HAVING sum(cost) > 500").writeStream.format("console").outputMode("complete").start() spark.sql("SELECT destId1, count(*) AS num_events WHERE event_type = 5 GROUP BY destId1 HAVING count(*) > 1000").writeStream.format("console").outputMode("complete").start() Of course, this is simplified; there are a lot more columns and the queries should also group by time period, but I didn't want to complicate it. With this example, I have 3 queries running on the same input files, but Spark would need to read the files from disk 3 times. These extra reads are what I'm trying to avoid. In the real application, the number of queries would be a lot higher and dynamic (they are generated in response to some configurations made by the end users). -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark structured streaming - efficient way to do lots of aggregations on the same input files
Hi, I'm considering using Apache Spark for the development of an application. This would replace a legacy program which reads CSV files and does lots (tens/hundreds) of aggregations on them. The aggregations are fairly simple: counts, sums, etc. while applying some filtering conditions on some of the columns. I prefer using structured streaming for its simplicity and low-latency. I'd also like to use full SQL queries (via createOrReplaceTempView). However, doing multiple queries means Spark will re-read the input files for each one of them. This seems very inefficient for my use-case. Does anyone have any suggestions? The only thing I found so far involves using forEachBatch and manually updating my aggregates. But, I think there should be a simpler solution for this use case. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Odd error when using a rdd map within a stream map
Hey, i don't think that's the issue, foreach is called on 'results' which is a DStream of floats, so naturally it passes RDDs to its function. And either way, changing the code in the first mapper to comment out the map reduce process on the RDD Float f = 1.0f; //nnRdd.map(new FunctionNeuralNet, Float() { // /** // * // */ // private static final long serialVersionUID = 876245667956566483L; // // @Override // public Float call(NeuralNet nn) throws Exception { // // return 1.0f; // } // }).reduce(new Function2Float, Float, Float() { // // /** // * // */ // private static final long serialVersionUID = 5461230777841578072L; // // @Override // public Float call(Float left, Float right) throws Exception { // // return left + right; // } // }); return Arrays.asList(f); works as expected, so it's most likely running that RDD.map().reduce() that's the issue somehow, i just don't get why it works when there's a .print() and the end and not a .foreach() -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Odd-error-when-using-a-rdd-map-within-a-stream-map-tp14551p14652.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Ensuring object in spark streaming runs on specific node
Say you have a spark streaming setup such as JavaReceiverInputDStream... rndLists = jssc.receiverStream(new JavaRandomReceiver(...)); rndLists.map(new NeuralNetMapper(...)) .foreach(new JavaSyncBarrier(...)); Is there any way of ensuring that, say, a JavaRandomReceiver and JavaSyncBarrier get distributed to the same node ? Or is this even a question that makes sense ? Some information as to how spark-streaming distributes work across a cluster would also be greatly appreciated. ( i've also asked this question on stackoverflow at http://stackoverflow.com/questions/25564356/ensuring-object-in-spark-streaming-runs-on-specific-node ) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Ensuring-object-in-spark-streaming-runs-on-specific-node-tp13114.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Developing a spark streaming application
Hey guys, so the problem i'm trying to tackle is the following: - I need a data source that emits messages at a certain frequency - There are N neural nets that need to process each message individually - The outputs from all neural nets are aggregated and only when all N outputs for each message are collected, should a message be declared fully processed - At the end i should measure the time it took for a message to be fully processed (time between when it was emitted and when all N neural net outputs from that message have been collected) What i'm mostly interested in is if i approached the problem correctly in the first place and if so some best practice pointers on my approach. And my current implementation if the following: For a data source i created the class public class JavaRandomReceiver extends ReceiverMaplt;String, Object As i decided a key-value store would be best suited to holding emitted data. The onStart() method initializes a custom random sequence generator and starts a thread that continuously generates new neural net inputs and stores them as following: SensorData sdata = generator.createSensorData(); MapString, Object result = new HashMapString, Object(); result.put(msgNo, sdata.getMsgNo()); result.put(sensorTime, sdata.getSampleTime()); result.put(list, sdata.getPayload()); result.put(timeOfProc, sdata.getCreationTime()); store(result); // sleeps for a given amount of time set at generator creation generator.waitForNextTuple(); The msgNo here is incremented for each newly created message and is used to keep The neural net functionality is added by creating a custom mapper public class NeuralNetMapper implements FunctionMaplt;String, Object, MapString, Object whose call function basically just takes the input map, plugs its list object as the input to the neural net object, replaces the map's initial list with the neural net output and returns the modified map. The aggregator is implemented as a single class that has the following form public class JavaSyncBarrier implements FunctionJavaRDDlt;Maplt;String,Object, Void This class maintains a google guava cache of neural net outputs that it has received in the form of Long, Listlt;Maplt;String, Object, where the Long value is the msgNo and the list contains all maps containing said message number. When a new map is received, it is added to the cache, its list's length is compared to to the total number of neural nets and, if these numbers match, that message number is said to be fully processed and a difference between timeOfProc (all maps with the same msgNo have the same timeOfProc) and the current system time is displayed as the total time necessary for processing. Now the way all these components are linked together is the following: public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName(SimpleSparkStreamingTest); JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000)); jssc.checkpoint(/tmp/spark-tempdir); // Generator config goes here // Set to emit new message every 1 second // --- // Neural net config goes here // --- JavaReceiverInputDStreamMaplt;String, Object rndLists = jssc .receiverStream(new JavaRandomReceiver(generatorConfig); ListJavaDStreamlt;Maplt;String, Object neuralNetOutputStreams = new ArrayListJavaDStreamlt;Maplt;String, Object(); for(int i = 0; i numberOfNets; i++){ neuralNetOutputStreams .add( rndLists.map(new NeuralNetMapper(neuralNetConfig)) ); } JavaDStreamMaplt;String, Object joined = joinStreams(neuralNetOutputs); joined.foreach(new JavaSyncBarrier(numberOfNets)); jssc.start(); jssc.awaitTermination(); } where joinStreams unifies a list of streams: public static T JavaDStreamT joinStreams(ListJavaDStreamlt;T streams) { JavaDStreamT result = streams.get(0); for (int i = 1; i streams.size(); i++) { result = result.union(streams.get(i)); } return result; } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-tp12893.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org