Thanks a lot. Now it's working fine. I wasn't aware of "socketTextStream",
not sure if it was documented in the spark programming guide.
On Mon, Sep 21, 2015 at 12:46 PM, Hemant Bhanawat
wrote:
> Why are you using rawSocketStream to read the data? I believe
> rawSocketStream waits for a big chunk of data before it can start
> processing it. I think what you are writing is a String and you should use
> socketTextStream which reads the data on a per line basis.
>
> On Sun, Sep 20, 2015 at 9:56 AM, Saiph Kappa
> wrote:
>
>> Hi,
>>
>> I am trying to build a data generator that feeds a streaming application.
>> This data generator just reads a file and send its lines through a socket.
>> I get no errors on the logs, and the benchmark bellow always prints
>> "Received 0 records". Am I doing something wrong?
>>
>>
>> object MyDataGenerator {
>>
>> def main(args: Array[String]) {
>> if (args.length != 3) {
>> System.err.println("Usage: RawTextSender ")
>> System.exit(1)
>> }
>> // Parse the arguments using a pattern match
>> val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt)
>>
>> val serverSocket = new ServerSocket(port)
>> println("Listening on port " + port)
>>
>>
>> while (true) {
>> val socket = serverSocket.accept()
>> println("Got a new connection")
>>
>>
>> val out = new PrintWriter(socket.getOutputStream)
>> try {
>> var count = 0
>> var startTimestamp = -1
>> for (line <- Source.fromFile(file).getLines()) {
>> val ts = line.substring(2, line.indexOf(',',2)).toInt
>> if(startTimestamp < 0)
>> startTimestamp = ts
>>
>> if(ts - startTimestamp <= 30) {
>> out.println(line)
>> count += 1
>> } else {
>> println(s"Emmited reports: $count")
>> count = 0
>> out.flush()
>> startTimestamp = ts
>> Thread.sleep(sleepMillis)
>> }
>> }
>> } catch {
>> case e: IOException =>
>> println("Client disconnected")
>> socket.close()
>> }
>> }
>> }
>> }
>>
>>
>>
>> object Benchmark {
>> def main(args: Array[String]) {
>> if (args.length != 4) {
>> System.err.println("Usage: RawNetworkGrep
>> ")
>> System.exit(1)
>> }
>>
>> val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1),
>> args(2).toInt, args(3).toInt)
>> val sparkConf = new SparkConf()
>> sparkConf.setAppName("BenchMark")
>>
>> sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar"))
>> sparkConf.set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> sparkConf.set("spark.executor.extraJavaOptions", "
>> -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts
>> -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ")
>> if (sparkConf.getOption("spark.master") == None) {
>> // Master not set, as this was not launched through Spark-submit.
>> Setting master as local."
>> sparkConf.setMaster("local[*]")
>> }
>>
>> // Create the context
>> val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
>>
>> val rawStreams = (1 to numStreams).map(_ =>
>> ssc.rawSocketStream[String](host, port,
>> StorageLevel.MEMORY_ONLY_SER)).toArray
>> val union = ssc.union(rawStreams)
>> union.count().map(c => s"Received $c records").print()
>> ssc.start()
>> ssc.awaitTermination()
>> }
>> }
>>
>> Thanks.
>>
>>
>