Hi, I am trying to build a data generator that feeds a streaming application. This data generator just reads a file and send its lines through a socket. I get no errors on the logs, and the benchmark bellow always prints "Received 0 records". Am I doing something wrong?
object MyDataGenerator { def main(args: Array[String]) { if (args.length != 3) { System.err.println("Usage: RawTextSender <port> <file> <sleepMillis>") System.exit(1) } // Parse the arguments using a pattern match val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt) val serverSocket = new ServerSocket(port) println("Listening on port " + port) while (true) { val socket = serverSocket.accept() println("Got a new connection") val out = new PrintWriter(socket.getOutputStream) try { var count = 0 var startTimestamp = -1 for (line <- Source.fromFile(file).getLines()) { val ts = line.substring(2, line.indexOf(',',2)).toInt if(startTimestamp < 0) startTimestamp = ts if(ts - startTimestamp <= 30) { out.println(line) count += 1 } else { println(s"Emmited reports: $count") count = 0 out.flush() startTimestamp = ts Thread.sleep(sleepMillis) } } } catch { case e: IOException => println("Client disconnected") socket.close() } } } } object Benchmark { def main(args: Array[String]) { if (args.length != 4) { System.err.println("Usage: RawNetworkGrep <numStreams> <host> <port> <batchMillis>") System.exit(1) } val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), args(2).toInt, args(3).toInt) val sparkConf = new SparkConf() sparkConf.setAppName("BenchMark") sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar")) sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("spark.executor.extraJavaOptions", " -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ") if (sparkConf.getOption("spark.master") == None) { // Master not set, as this was not launched through Spark-submit. Setting master as local." sparkConf.setMaster("local[*]") } // Create the context val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) val rawStreams = (1 to numStreams).map(_ => ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER)).toArray val union = ssc.union(rawStreams) union.count().map(c => s"Received $c records").print() ssc.start() ssc.awaitTermination() } } Thanks.