Hey Jim,

Do you see the same behavior if you run this outside of eclipse?

Also, what happens if you print something to standard out when setting
up your streams (i.e. not inside of the foreach) do you see that? This
could be a streaming issue, but it could also be something related to
the way it's running in eclipse.

- Patrick

On Fri, May 23, 2014 at 2:57 PM, Jim Donahue <jdona...@adobe.com> wrote:
> I¹m trying out 1.0 on a set of small Spark Streaming tests and am running
> into problems.  Here¹s one of the little programs I¹ve used for a long
> time ‹ it reads a Kafka stream that contains Twitter JSON tweets and does
> some simple counting.  The program starts OK (it connects to the Kafka
> stream fine) and generates a stream of INFO logging messages, but never
> generates any output. :-(
>
> I¹m running this in Eclipse, so there may be some class loading issue
> (loading the wrong class or something like that), but I¹m not seeing
> anything in the console output.
>
> Thanks,
>
> Jim Donahue
> Adobe
>
>
>
> val kafka_messages =
>       KafkaUtils.createStream[Array[Byte], Array[Byte],
> kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder](ssc,
> propsMap, topicMap, StorageLevel.MEMORY_AND_DISK)
>
>
>      val messages = kafka_messages.map(_._2)
>
>
>      val total = ssc.sparkContext.accumulator(0)
>
>
>      val startTime = new java.util.Date().getTime()
>
>
>      val jsonstream = messages.map[JSONObject](message =>
>       {val string = new String(message);
>       val json = new JSONObject(string);
>       total += 1
>       json
>       }
>     )
>
>
>     val deleted = ssc.sparkContext.accumulator(0)
>
>
>     val msgstream = jsonstream.filter(json =>
>       if (!json.has("delete")) true else { deleted += 1; false}
>       )
>
>
>     msgstream.foreach(rdd => {
>       if(rdd.count() > 0){
>       val data = rdd.map(json => (json.has("entities"),
> json.length())).collect()
>       val entities: Double = data.count(t => t._1)
>       val fieldCounts = data.sortBy(_._2)
>       val minFields = fieldCounts(0)._2
>       val maxFields = fieldCounts(fieldCounts.size - 1)._2
>       val now = new java.util.Date()
>       val interval = (now.getTime() - startTime) / 1000
>       System.out.println(now.toString)
>       System.out.println("processing time: " + interval + " seconds")
>       System.out.println("total messages: " + total.value)
>       System.out.println("deleted messages: " + deleted.value)
>       System.out.println("message receipt rate: " + (total.value/interval)
> + " per second")
>       System.out.println("messages this interval: " + data.length)
>       System.out.println("message fields varied between: " + minFields + "
> and " + maxFields)
>       System.out.println("fraction with entities is " + (entities /
> data.length))
>       }
>     }
>     )
>
>     ssc.start()
>

Reply via email to