I¹m trying out 1.0 on a set of small Spark Streaming tests and am running into problems. Here¹s one of the little programs I¹ve used for a long time ‹ it reads a Kafka stream that contains Twitter JSON tweets and does some simple counting. The program starts OK (it connects to the Kafka stream fine) and generates a stream of INFO logging messages, but never generates any output. :-(
I¹m running this in Eclipse, so there may be some class loading issue (loading the wrong class or something like that), but I¹m not seeing anything in the console output. Thanks, Jim Donahue Adobe val kafka_messages = KafkaUtils.createStream[Array[Byte], Array[Byte], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder](ssc, propsMap, topicMap, StorageLevel.MEMORY_AND_DISK) val messages = kafka_messages.map(_._2) val total = ssc.sparkContext.accumulator(0) val startTime = new java.util.Date().getTime() val jsonstream = messages.map[JSONObject](message => {val string = new String(message); val json = new JSONObject(string); total += 1 json } ) val deleted = ssc.sparkContext.accumulator(0) val msgstream = jsonstream.filter(json => if (!json.has("delete")) true else { deleted += 1; false} ) msgstream.foreach(rdd => { if(rdd.count() > 0){ val data = rdd.map(json => (json.has("entities"), json.length())).collect() val entities: Double = data.count(t => t._1) val fieldCounts = data.sortBy(_._2) val minFields = fieldCounts(0)._2 val maxFields = fieldCounts(fieldCounts.size - 1)._2 val now = new java.util.Date() val interval = (now.getTime() - startTime) / 1000 System.out.println(now.toString) System.out.println("processing time: " + interval + " seconds") System.out.println("total messages: " + total.value) System.out.println("deleted messages: " + deleted.value) System.out.println("message receipt rate: " + (total.value/interval) + " per second") System.out.println("messages this interval: " + data.length) System.out.println("message fields varied between: " + minFields + " and " + maxFields) System.out.println("fraction with entities is " + (entities / data.length)) } } ) ssc.start()