Hi Mich, Please take a look at how to write data into Kafka topic with DStreams: https://github.com/gaborgsomogyi/spark-dstream-secure-kafka-sink-app/blob/62d64ce368bc07b385261f85f44971b32fe41327/src/main/scala/com/cloudera/spark/examples/DirectKafkaSinkWordCount.scala#L77 (DStreams has no native Kafka sink, if you need it use Structured Streaming)
BR, G On Wed, Mar 27, 2019 at 8:47 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi, > > In a traditional we get data via Kafka into Spark streaming, do some work > and write to a NoSQL database like Mongo, Hbase or Aerospike. > > That part can be done below and is best explained by the code as follows: > > Once a high value DF lookups is created I want send the data to a new > topic for recipients! > > val kafkaParams = Map[String, String]( > "bootstrap.servers" -> > bootstrapServers, > "schema.registry.url" -> > schemaRegistryURL, > "zookeeper.connect" -> > zookeeperConnect, > "group.id" -> sparkAppName, > "zookeeper.connection.timeout.ms" > -> zookeeperConnectionTimeoutMs, > "rebalance.backoff.ms" -> > rebalanceBackoffMS, > "zookeeper.session.timeout.ms" -> > zookeeperSessionTimeOutMs, > "auto.commit.interval.ms" -> > autoCommitIntervalMS > ) > //val topicsSet = topics.split(",").toSet > val topics = Set(topicsValue) > val dstream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](streamingContext, kafkaParams, topics) > // This returns a tuple of key and value (since messages in Kafka are > optionally keyed). In this case it is of type (String, String) > dstream.cache() > // > val topicsOut = Set(topicsValueOut) > val dstreamOut = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsOut) > dstreamOut.cache() > > > dstream.foreachRDD > { pricesRDD => > if (!pricesRDD.isEmpty) // data exists in RDD > { > val op_time = System.currentTimeMillis.toString > val spark = > SparkSessionSingleton.getInstance(pricesRDD.sparkContext.getConf) > val sc = spark.sparkContext > import spark.implicits._ > var operation = new operationStruct(op_type, op_time) > // Convert RDD[String] to RDD[case class] to DataFrame > val RDDString = pricesRDD.map { case (_, value) => > value.split(',') }.map(p => > priceDocument(priceStruct(p(0).toString,p(1).toString,p(2).toString,p(3).toDouble, > currency), operation)) > val df = spark.createDataFrame(RDDString) > //df.printSchema > var document = df.filter('priceInfo.getItem("price") > 90.0) > MongoSpark.save(document, writeConfig) > println("Current time is: " + Calendar.getInstance.getTime) > totalPrices += document.count > var endTimeQuery = System.currentTimeMillis > println("Total Prices added to the collection so far: " > +totalPrices+ " , Runnig for " + (endTimeQuery - > startTimeQuery)/(1000*60)+" Minutes") > // Check if running time > runTime exit > if( (endTimeQuery - startTimeQuery)/(100000*60) > runTime) > { > println("\nDuration exceeded " + runTime + " minutes exiting") > System.exit(0) > } > // picking up individual arrays --> > df.select('otherDetails.getItem("tickerQuotes")(0)) shows first element > //val lookups = df.filter('priceInfo.getItem("ticker") === > tickerWatch && 'priceInfo.getItem("price") > priceWatch) > val lookups = df.filter('priceInfo.getItem("price") > priceWatch) > if(lookups.count > 0) { > println("High value tickers") > lookups.select('priceInfo.getItem("timeissued").as("Time > issued"), 'priceInfo.getItem("ticker").as("Ticker"), > 'priceInfo.getItem("price").cast("Double").as("Latest price")).show > > // Now here I want to send the content of lookups DF to another kafka > topic!!! > //Note that above I have created a new dstreamOut with a new topic > topicsOut > How that can be done? > } > } > } > > Thanks > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > >