Wondering why are you creating separate dstreams? You should apply the logic directly on input dstream On 18 Aug 2016 06:40, "vidhan" <vid...@kitboard.co> wrote:
> I have a *kafka* stream coming in with some input topic. > This is the code i wrote for accepting *kafka* stream. > > *>>> conf = SparkConf().setAppName(appname) > >>> sc = SparkContext(conf=conf) > >>> ssc = StreamingContext(sc) > >>> kvs = KafkaUtils.createDirectStream(ssc, topics,\ > {"metadata.broker.list": brokers})* > > Then I create two DStreams of the keys and values of the original stream. > > *>>> keys = kvs.map(lambda x: x[0].split(" ")) > >>> values = kvs.map(lambda x: x[1].split(" "))* > > Then I perform some computation in the values DStream. > For Example, > *>>> val = values.flatMap(lambda x: x*2)* > > Now, I need to combine the */keys/* and the */val/* *DStream* and return > the > result in the form of *Kafka* stream. > > How to combine val to the corressponding key? > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/How-to-combine-two-DStreams-pyspark-tp27552.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >