I'm using spark streaming with Kafka, and submit it to YARN cluster with
mode "yarn-cluster". But it hangs at SparkContext.start(). The Kafka config
is right since it can show some events in "Streaming" tab of web UI.

The attached file is the screen shot of the "Jobs" tab of web UI. The code
in the main class is:

object StatCounter {

  val config = ConfigFactory.load()
  val redisUrl = config.getString("redis.url")
  val redisPort = config.getInt("redis.port")
  val zkQuorum = config.getString("kafka.zkQuorum")
  val group = config.getString("kafka.group")
  val topic = config.getString("kafka.topic")
  val threadNum = config.getInt("kafka.threadNum")

  val cache = new RedisCache(redisUrl, redisPort)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
        .setAppName(config.getString("spark.name"))
        .set("spark.cassandra.connection.host",
config.getString("cassandra.host"))

    val ssc = new StreamingContext(conf,
Seconds(config.getInt("spark.interval")))
    ssc.checkpoint(config.getString("spark.checkpoint"))
    val storage = new CassandraStorage("adhoc_data", ssc)

    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic ->
threadNum)).map(_._2)

    val logs = lines.flatMap(line => Parser.parseBody(line, cache))
    Counter.count(logs, storage)

    sys.ShutdownHookThread {
      println("Gracefully stopping Spark Streaming Application")
      ssc.stop(stopSparkContext = true, stopGracefully = true)
      println("Application stopped")
    }

    ssc.start()
    ssc.awaitTermination()
  }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to