Re: Spark structured streaming - efficient way to do lots of aggregations on the same input files

2021-01-22 Thread Filip
Hi,

I don't have any code for the forEachBatch approach, I mentioned it due to
this response to my question on SO:
https://stackoverflow.com/a/65803718/1017130

I have added some very simple code below that I think shows what I'm trying
to do:
val schema = StructType(
Array(
StructField("senderId1", LongType),
StructField("senderId2", LongType),
StructField("destId1", LongType),
StructField("eventType", IntegerType)
StructField("cost", LongType)
)
)

val fileStreamDf = spark.readStream.schema(schema).option("delimiter",
"\t").csv("D:\\SparkTest")

fileStreamDf.createOrReplaceTempView("myTable")

spark.sql("SELECT senderId1, count(*) AS num_events FROM myTable GROUP BY
senderId1 HAVING count(*) >
1").writeStream.format("console").outputMode("complete").start()
spark.sql("SELECT senderId2, sum(cost) AS total_cost FROM myTable WHERE
eventType = 3 GROUP BY senderId2 HAVING sum(cost) >
500").writeStream.format("console").outputMode("complete").start()
spark.sql("SELECT destId1, count(*) AS num_events WHERE event_type = 5 GROUP
BY destId1 HAVING count(*) >
1000").writeStream.format("console").outputMode("complete").start()

Of course, this is simplified; there are a lot more columns and the queries
should also group by time period, but I didn't want to complicate it.
With this example, I have 3 queries running on the same input files, but
Spark would need to read the files from disk 3 times. These extra reads are
what I'm trying to avoid.
In the real application, the number of queries would be a lot higher and
dynamic (they are generated in response to some configurations made by the
end users).



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark structured streaming - efficient way to do lots of aggregations on the same input files

2021-01-21 Thread Filip
Hi,

I'm considering using Apache Spark for the development of an application.
This would replace a legacy program which reads CSV files and does lots
(tens/hundreds) of aggregations on them. The aggregations are fairly simple:
counts, sums, etc. while applying some filtering conditions on some of the
columns.

I prefer using structured streaming for its simplicity and low-latency. I'd
also like to use full SQL queries (via createOrReplaceTempView). However,
doing multiple queries means Spark will re-read the input files for each one
of them. This seems very inefficient for my use-case.

Does anyone have any suggestions? The only thing I found so far involves
using forEachBatch and manually updating my aggregates. But, I think there
should be a simpler solution for this use case.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Odd error when using a rdd map within a stream map

2014-09-19 Thread Filip Andrei
Hey, i don't think that's the issue, foreach is called on 'results' which is
a DStream of floats, so naturally it passes RDDs to its function.

And either way, changing the code in the first mapper to comment out the map
reduce process on the RDD

Float f = 1.0f; //nnRdd.map(new FunctionNeuralNet, Float() {

//  /**
//   * 
//   */
//  private static final long 
serialVersionUID = 876245667956566483L;
//
//  @Override
//  public Float call(NeuralNet nn) 
throws Exception {
//  
//  return 1.0f;
//  }
//  }).reduce(new Function2Float, Float, 
Float() {
//  
//  /**
//   * 
//   */
//  private static final long 
serialVersionUID = 5461230777841578072L;
//
//  @Override
//  public Float call(Float left, 
Float right) throws Exception {
//  
//  return left + right;
//  }
//  });

return Arrays.asList(f);

works as expected, so it's most likely  running that RDD.map().reduce()
that's the issue somehow, i just don't get why it works when there's a
.print() and the end and not a .foreach()



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Odd-error-when-using-a-rdd-map-within-a-stream-map-tp14551p14652.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Ensuring object in spark streaming runs on specific node

2014-08-29 Thread Filip Andrei
Say you have a spark streaming setup such as

JavaReceiverInputDStream... rndLists = jssc.receiverStream(new
JavaRandomReceiver(...));

rndLists.map(new NeuralNetMapper(...))
.foreach(new JavaSyncBarrier(...));

Is there any way of ensuring that, say, a JavaRandomReceiver and
JavaSyncBarrier get distributed to the same node ? Or is this even a
question that makes sense ?

Some information as to how spark-streaming distributes work across a cluster
would also be greatly appreciated.

( i've also asked this question on stackoverflow at
http://stackoverflow.com/questions/25564356/ensuring-object-in-spark-streaming-runs-on-specific-node
)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Ensuring-object-in-spark-streaming-runs-on-specific-node-tp13114.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Developing a spark streaming application

2014-08-27 Thread Filip Andrei
Hey guys, so the problem i'm trying to tackle is the following:

- I need a data source that emits messages at a certain frequency
- There are N neural nets that need to process each message individually
- The outputs from all neural nets are aggregated and only when all N
outputs for each message are collected, should a message be declared fully
processed
- At the end i should measure the time it took for a message to be fully
processed (time between when it was emitted and when all N neural net
outputs from that message have been collected)


What i'm mostly interested in is if i approached the problem correctly in
the first place and if so some best practice pointers on my approach.






And my current implementation if the following:


For a data source i created the class
public class JavaRandomReceiver extends ReceiverMaplt;String, Object

As i decided a key-value store would be best suited to holding emitted data.


The onStart() method initializes a custom random sequence generator and
starts a thread that
continuously generates new neural net inputs and stores them as following:

SensorData sdata = generator.createSensorData();

MapString, Object result = new HashMapString, Object();

result.put(msgNo, sdata.getMsgNo());
result.put(sensorTime, sdata.getSampleTime());
result.put(list, sdata.getPayload());
result.put(timeOfProc, sdata.getCreationTime());

store(result);

// sleeps for a given amount of time set at generator creation
generator.waitForNextTuple();

The msgNo here is incremented for each newly created message and is used to
keep 


The neural net functionality is added by creating a custom mapper
public class NeuralNetMapper implements FunctionMaplt;String, Object,
MapString, Object

whose call function basically just takes the input map, plugs its list
object as the input to the neural net object, replaces the map's initial
list with the neural net output and returns the modified map.




The aggregator is implemented as a single class that has the following form

public class JavaSyncBarrier implements
FunctionJavaRDDlt;Maplt;String,Object, Void



This class maintains a google guava cache of neural net outputs that it has
received in the form of
Long, Listlt;Maplt;String, Object, where the Long value is the msgNo
and the list contains all maps containing said message number.

When a new map is received, it is added to the cache, its list's length is
compared to to the total number of neural nets and, if these numbers match,
that message number is said to be fully processed and a difference between
timeOfProc (all maps with the same msgNo have the same timeOfProc) and the
current system time is displayed as the total time necessary for processing.





Now the way all these components are linked together is the following:

public static void main(String[] args) {


SparkConf conf = new SparkConf();
conf.setAppName(SimpleSparkStreamingTest);


JavaStreamingContext jssc = new JavaStreamingContext(conf, new
Duration(1000));

jssc.checkpoint(/tmp/spark-tempdir);

// Generator config goes here
// Set to emit new message every 1 second
// ---

// Neural net config goes here
// ---  

JavaReceiverInputDStreamMaplt;String, Object rndLists = jssc
.receiverStream(new JavaRandomReceiver(generatorConfig);

ListJavaDStreamlt;Maplt;String, Object neuralNetOutputStreams = 
new
ArrayListJavaDStreamlt;Maplt;String, Object();

for(int i = 0; i  numberOfNets; i++){

neuralNetOutputStreams .add(
rndLists.map(new NeuralNetMapper(neuralNetConfig))
);
}

JavaDStreamMaplt;String, Object joined = 
joinStreams(neuralNetOutputs);

joined.foreach(new JavaSyncBarrier(numberOfNets));

jssc.start();
jssc.awaitTermination();
}

where joinStreams unifies a list of streams:
public static T JavaDStreamT joinStreams(ListJavaDStreamlt;T
streams) {

JavaDStreamT result = streams.get(0);
for (int i = 1; i  streams.size(); i++) {
result = result.union(streams.get(i));
}

return result;
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-tp12893.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org