You need to trigger some action (stream.print(), stream.foreachRDD, stream.saveAs*) over the stream that you created for the entire pipeline to execute.
In your code add the following line: *unifiedStream.print()* Thanks Best Regards On Sat, Jan 17, 2015 at 3:35 PM, Rohit Pujari <rpuj...@hortonworks.com> wrote: > Hello Folks: > > I'm running into following error while executing relatively straight > forward spark-streaming code. Am I missing anything? > > *Exception in thread "main" java.lang.AssertionError: assertion failed: No > output streams registered, so nothing to execute* > > > Code: > > val conf = new SparkConf().setMaster("local[2]").setAppName("Streams") > val ssc = new StreamingContext(conf, Seconds(1)) > > val kafkaStream = { > val sparkStreamingConsumerGroup = "spark-streaming-consumer-group" > val kafkaParams = Map( > "zookeeper.connect" -> "node1.c.emerald-skill-783.internal:2181", > "group.id" -> "twitter", > "zookeeper.connection.timeout.ms" -> "1000") > val inputTopic = "twitter" > val numPartitionsOfInputTopic = 2 > val streams = (1 to numPartitionsOfInputTopic) map { _ => > KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1), > StorageLevel.MEMORY_ONLY_SER) > } > val unifiedStream = ssc.union(streams) > val sparkProcessingParallelism = 1 > unifiedStream.repartition(sparkProcessingParallelism) > } > > //print(kafkaStream) > ssc.start() > ssc.awaitTermination() > > -- > Rohit Pujari > > > CONFIDENTIALITY NOTICE > NOTICE: This message is intended for the use of the individual or entity > to which it is addressed and may contain information that is confidential, > privileged and exempt from disclosure under applicable law. If the reader > of this message is not the intended recipient, you are hereby notified that > any printing, copying, dissemination, distribution, disclosure or > forwarding of this communication is strictly prohibited. If you have > received this communication in error, please contact the sender immediately > and delete it from your system. Thank You.