I¹m trying out 1.0 on a set of small Spark Streaming tests and am running
into problems.  Here¹s one of the little programs I¹ve used for a long
time ‹ it reads a Kafka stream that contains Twitter JSON tweets and does
some simple counting.  The program starts OK (it connects to the Kafka
stream fine) and generates a stream of INFO logging messages, but never
generates any output. :-(

I¹m running this in Eclipse, so there may be some class loading issue
(loading the wrong class or something like that), but I¹m not seeing
anything in the console output.

Thanks,

Jim Donahue
Adobe



val kafka_messages =
      KafkaUtils.createStream[Array[Byte], Array[Byte],
kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder](ssc,
propsMap, topicMap, StorageLevel.MEMORY_AND_DISK)

    
     val messages = kafka_messages.map(_._2)

     
     val total = ssc.sparkContext.accumulator(0)

     
     val startTime = new java.util.Date().getTime()

     
     val jsonstream = messages.map[JSONObject](message =>
      {val string = new String(message);
      val json = new JSONObject(string);
      total += 1
      json
      }
    )

    
    val deleted = ssc.sparkContext.accumulator(0)

    
    val msgstream = jsonstream.filter(json =>
      if (!json.has("delete")) true else { deleted += 1; false}
      )

    
    msgstream.foreach(rdd => {
      if(rdd.count() > 0){
      val data = rdd.map(json => (json.has("entities"),
json.length())).collect()
      val entities: Double = data.count(t => t._1)
      val fieldCounts = data.sortBy(_._2)
      val minFields = fieldCounts(0)._2
      val maxFields = fieldCounts(fieldCounts.size - 1)._2
      val now = new java.util.Date()
      val interval = (now.getTime() - startTime) / 1000
      System.out.println(now.toString)
      System.out.println("processing time: " + interval + " seconds")
      System.out.println("total messages: " + total.value)
      System.out.println("deleted messages: " + deleted.value)
      System.out.println("message receipt rate: " + (total.value/interval)
+ " per second")
      System.out.println("messages this interval: " + data.length)
      System.out.println("message fields varied between: " + minFields + "
and " + maxFields)
      System.out.println("fraction with entities is " + (entities /
data.length))
      }
    }
    )
    
    ssc.start()

Reply via email to