Hi,

I am trying to build a data generator that feeds a streaming application.
This data generator just reads a file and send its lines through a socket.
I get no errors on the logs, and the benchmark bellow always prints
"Received 0 records". Am I doing something wrong?


object MyDataGenerator {

  def main(args: Array[String]) {
    if (args.length != 3) {
      System.err.println("Usage: RawTextSender <port> <file> <sleepMillis>")
      System.exit(1)
    }
    // Parse the arguments using a pattern match
    val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt)

    val serverSocket = new ServerSocket(port)
    println("Listening on port " + port)


    while (true) {
      val socket = serverSocket.accept()
      println("Got a new connection")


      val out = new PrintWriter(socket.getOutputStream)
      try {
        var count = 0
        var startTimestamp = -1
        for (line <- Source.fromFile(file).getLines()) {
          val ts = line.substring(2, line.indexOf(',',2)).toInt
          if(startTimestamp < 0)
            startTimestamp = ts

          if(ts - startTimestamp <= 30) {
            out.println(line)
            count += 1
          } else {
            println(s"Emmited reports: $count")
            count = 0
            out.flush()
            startTimestamp = ts
            Thread.sleep(sleepMillis)
          }
        }
      } catch {
        case e: IOException =>
          println("Client disconnected")
          socket.close()
      }
    }
}
}



object Benchmark {
  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println("Usage: RawNetworkGrep <numStreams> <host>
<port> <batchMillis>")
      System.exit(1)
    }

    val (numStreams, host, port, batchMillis) = (args(0).toInt,
args(1), args(2).toInt, args(3).toInt)
    val sparkConf = new SparkConf()
    sparkConf.setAppName("BenchMark")
    
sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar"))
    sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
    sparkConf.set("spark.executor.extraJavaOptions", "
-XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts
-XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ")
    if (sparkConf.getOption("spark.master") == None) {
      // Master not set, as this was not launched through
Spark-submit. Setting master as local."
      sparkConf.setMaster("local[*]")
    }

    // Create the context
    val ssc = new StreamingContext(sparkConf, Duration(batchMillis))

    val rawStreams = (1 to numStreams).map(_ =>
      ssc.rawSocketStream[String](host, port,
StorageLevel.MEMORY_ONLY_SER)).toArray
    val union = ssc.union(rawStreams)
    union.count().map(c => s"Received $c records").print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Thanks.

Reply via email to