I¹m trying out 1.0 on a set of small Spark Streaming tests and am running
into problems. Here¹s one of the little programs I¹ve used for a long
time ‹ it reads a Kafka stream that contains Twitter JSON tweets and does
some simple counting. The program starts OK (it connects to the Kafka
stream fine) and generates a stream of INFO logging messages, but never
generates any output. :-(
I¹m running this in Eclipse, so there may be some class loading issue
(loading the wrong class or something like that), but I¹m not seeing
anything in the console output.
Thanks,
Jim Donahue
Adobe
val kafka_messages =
KafkaUtils.createStream[Array[Byte], Array[Byte],
kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder](ssc,
propsMap, topicMap, StorageLevel.MEMORY_AND_DISK)
val messages = kafka_messages.map(_._2)
val total = ssc.sparkContext.accumulator(0)
val startTime = new java.util.Date().getTime()
val jsonstream = messages.map[JSONObject](message =>
{val string = new String(message);
val json = new JSONObject(string);
total += 1
json
}
)
val deleted = ssc.sparkContext.accumulator(0)
val msgstream = jsonstream.filter(json =>
if (!json.has("delete")) true else { deleted += 1; false}
)
msgstream.foreach(rdd => {
if(rdd.count() > 0){
val data = rdd.map(json => (json.has("entities"),
json.length())).collect()
val entities: Double = data.count(t => t._1)
val fieldCounts = data.sortBy(_._2)
val minFields = fieldCounts(0)._2
val maxFields = fieldCounts(fieldCounts.size - 1)._2
val now = new java.util.Date()
val interval = (now.getTime() - startTime) / 1000
System.out.println(now.toString)
System.out.println("processing time: " + interval + " seconds")
System.out.println("total messages: " + total.value)
System.out.println("deleted messages: " + deleted.value)
System.out.println("message receipt rate: " + (total.value/interval)
+ " per second")
System.out.println("messages this interval: " + data.length)
System.out.println("message fields varied between: " + minFields + "
and " + maxFields)
System.out.println("fraction with entities is " + (entities /
data.length))
}
}
)
ssc.start()