Re: DataGenerator for streaming application
Why are you using rawSocketStream to read the data? I believe rawSocketStream waits for a big chunk of data before it can start processing it. I think what you are writing is a String and you should use socketTextStream which reads the data on a per line basis. On Sun, Sep 20, 2015 at 9:56 AM, Saiph Kappawrote: > Hi, > > I am trying to build a data generator that feeds a streaming application. > This data generator just reads a file and send its lines through a socket. > I get no errors on the logs, and the benchmark bellow always prints > "Received 0 records". Am I doing something wrong? > > > object MyDataGenerator { > > def main(args: Array[String]) { > if (args.length != 3) { > System.err.println("Usage: RawTextSender ") > System.exit(1) > } > // Parse the arguments using a pattern match > val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt) > > val serverSocket = new ServerSocket(port) > println("Listening on port " + port) > > > while (true) { > val socket = serverSocket.accept() > println("Got a new connection") > > > val out = new PrintWriter(socket.getOutputStream) > try { > var count = 0 > var startTimestamp = -1 > for (line <- Source.fromFile(file).getLines()) { > val ts = line.substring(2, line.indexOf(',',2)).toInt > if(startTimestamp < 0) > startTimestamp = ts > > if(ts - startTimestamp <= 30) { > out.println(line) > count += 1 > } else { > println(s"Emmited reports: $count") > count = 0 > out.flush() > startTimestamp = ts > Thread.sleep(sleepMillis) > } > } > } catch { > case e: IOException => > println("Client disconnected") > socket.close() > } > } > } > } > > > > object Benchmark { > def main(args: Array[String]) { > if (args.length != 4) { > System.err.println("Usage: RawNetworkGrep > ") > System.exit(1) > } > > val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), > args(2).toInt, args(3).toInt) > val sparkConf = new SparkConf() > sparkConf.setAppName("BenchMark") > > sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar")) > sparkConf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > sparkConf.set("spark.executor.extraJavaOptions", " -XX:+UseCompressedOops > -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts -XX:FreqInlineSize=300 > -XX:MaxInlineSize=300 ") > if (sparkConf.getOption("spark.master") == None) { > // Master not set, as this was not launched through Spark-submit. > Setting master as local." > sparkConf.setMaster("local[*]") > } > > // Create the context > val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) > > val rawStreams = (1 to numStreams).map(_ => > ssc.rawSocketStream[String](host, port, > StorageLevel.MEMORY_ONLY_SER)).toArray > val union = ssc.union(rawStreams) > union.count().map(c => s"Received $c records").print() > ssc.start() > ssc.awaitTermination() > } > } > > Thanks. > >
Re: DataGenerator for streaming application
Thanks a lot. Now it's working fine. I wasn't aware of "socketTextStream", not sure if it was documented in the spark programming guide. On Mon, Sep 21, 2015 at 12:46 PM, Hemant Bhanawatwrote: > Why are you using rawSocketStream to read the data? I believe > rawSocketStream waits for a big chunk of data before it can start > processing it. I think what you are writing is a String and you should use > socketTextStream which reads the data on a per line basis. > > On Sun, Sep 20, 2015 at 9:56 AM, Saiph Kappa > wrote: > >> Hi, >> >> I am trying to build a data generator that feeds a streaming application. >> This data generator just reads a file and send its lines through a socket. >> I get no errors on the logs, and the benchmark bellow always prints >> "Received 0 records". Am I doing something wrong? >> >> >> object MyDataGenerator { >> >> def main(args: Array[String]) { >> if (args.length != 3) { >> System.err.println("Usage: RawTextSender ") >> System.exit(1) >> } >> // Parse the arguments using a pattern match >> val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt) >> >> val serverSocket = new ServerSocket(port) >> println("Listening on port " + port) >> >> >> while (true) { >> val socket = serverSocket.accept() >> println("Got a new connection") >> >> >> val out = new PrintWriter(socket.getOutputStream) >> try { >> var count = 0 >> var startTimestamp = -1 >> for (line <- Source.fromFile(file).getLines()) { >> val ts = line.substring(2, line.indexOf(',',2)).toInt >> if(startTimestamp < 0) >> startTimestamp = ts >> >> if(ts - startTimestamp <= 30) { >> out.println(line) >> count += 1 >> } else { >> println(s"Emmited reports: $count") >> count = 0 >> out.flush() >> startTimestamp = ts >> Thread.sleep(sleepMillis) >> } >> } >> } catch { >> case e: IOException => >> println("Client disconnected") >> socket.close() >> } >> } >> } >> } >> >> >> >> object Benchmark { >> def main(args: Array[String]) { >> if (args.length != 4) { >> System.err.println("Usage: RawNetworkGrep >> ") >> System.exit(1) >> } >> >> val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), >> args(2).toInt, args(3).toInt) >> val sparkConf = new SparkConf() >> sparkConf.setAppName("BenchMark") >> >> sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar")) >> sparkConf.set("spark.serializer", >> "org.apache.spark.serializer.KryoSerializer") >> sparkConf.set("spark.executor.extraJavaOptions", " >> -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts >> -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ") >> if (sparkConf.getOption("spark.master") == None) { >> // Master not set, as this was not launched through Spark-submit. >> Setting master as local." >> sparkConf.setMaster("local[*]") >> } >> >> // Create the context >> val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) >> >> val rawStreams = (1 to numStreams).map(_ => >> ssc.rawSocketStream[String](host, port, >> StorageLevel.MEMORY_ONLY_SER)).toArray >> val union = ssc.union(rawStreams) >> union.count().map(c => s"Received $c records").print() >> ssc.start() >> ssc.awaitTermination() >> } >> } >> >> Thanks. >> >> >
DataGenerator for streaming application
Hi, I am trying to build a data generator that feeds a streaming application. This data generator just reads a file and send its lines through a socket. I get no errors on the logs, and the benchmark bellow always prints "Received 0 records". Am I doing something wrong? object MyDataGenerator { def main(args: Array[String]) { if (args.length != 3) { System.err.println("Usage: RawTextSender ") System.exit(1) } // Parse the arguments using a pattern match val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt) val serverSocket = new ServerSocket(port) println("Listening on port " + port) while (true) { val socket = serverSocket.accept() println("Got a new connection") val out = new PrintWriter(socket.getOutputStream) try { var count = 0 var startTimestamp = -1 for (line <- Source.fromFile(file).getLines()) { val ts = line.substring(2, line.indexOf(',',2)).toInt if(startTimestamp < 0) startTimestamp = ts if(ts - startTimestamp <= 30) { out.println(line) count += 1 } else { println(s"Emmited reports: $count") count = 0 out.flush() startTimestamp = ts Thread.sleep(sleepMillis) } } } catch { case e: IOException => println("Client disconnected") socket.close() } } } } object Benchmark { def main(args: Array[String]) { if (args.length != 4) { System.err.println("Usage: RawNetworkGrep ") System.exit(1) } val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), args(2).toInt, args(3).toInt) val sparkConf = new SparkConf() sparkConf.setAppName("BenchMark") sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar")) sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("spark.executor.extraJavaOptions", " -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ") if (sparkConf.getOption("spark.master") == None) { // Master not set, as this was not launched through Spark-submit. Setting master as local." sparkConf.setMaster("local[*]") } // Create the context val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) val rawStreams = (1 to numStreams).map(_ => ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER)).toArray val union = ssc.union(rawStreams) union.count().map(c => s"Received $c records").print() ssc.start() ssc.awaitTermination() } } Thanks.