Hi Mich,

Please take a look at how to write data into Kafka topic with DStreams:
https://github.com/gaborgsomogyi/spark-dstream-secure-kafka-sink-app/blob/62d64ce368bc07b385261f85f44971b32fe41327/src/main/scala/com/cloudera/spark/examples/DirectKafkaSinkWordCount.scala#L77
(DStreams has no native Kafka sink, if you need it use Structured Streaming)

BR,
G


On Wed, Mar 27, 2019 at 8:47 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
> In a traditional we get data via Kafka into Spark streaming, do some work
> and write to a NoSQL database like Mongo, Hbase or Aerospike.
>
> That part can be done below and is best explained by the code as follows:
>
> Once a high value DF lookups is created I want send the data to a new
> topic for recipients!
>
>     val kafkaParams = Map[String, String](
>                                       "bootstrap.servers" ->
> bootstrapServers,
>                                       "schema.registry.url" ->
> schemaRegistryURL,
>                                        "zookeeper.connect" ->
> zookeeperConnect,
>                                        "group.id" -> sparkAppName,
>                                        "zookeeper.connection.timeout.ms"
> -> zookeeperConnectionTimeoutMs,
>                                        "rebalance.backoff.ms" ->
> rebalanceBackoffMS,
>                                        "zookeeper.session.timeout.ms" ->
> zookeeperSessionTimeOutMs,
>                                        "auto.commit.interval.ms" ->
> autoCommitIntervalMS
>                                      )
>     //val topicsSet = topics.split(",").toSet
>     val topics = Set(topicsValue)
>     val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>     // This returns a tuple of key and value (since messages in Kafka are
> optionally keyed). In this case it is of type (String, String)
>     dstream.cache()
>     //
>     val topicsOut = Set(topicsValueOut)
>     val dstreamOut = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsOut)
>     dstreamOut.cache()
>
>
>     dstream.foreachRDD
>     { pricesRDD =>
>       if (!pricesRDD.isEmpty)  // data exists in RDD
>       {
>         val op_time = System.currentTimeMillis.toString
>         val spark =
> SparkSessionSingleton.getInstance(pricesRDD.sparkContext.getConf)
>         val sc = spark.sparkContext
>         import spark.implicits._
>         var operation = new operationStruct(op_type, op_time)
>         // Convert RDD[String] to RDD[case class] to DataFrame
>         val RDDString = pricesRDD.map { case (_, value) =>
> value.split(',') }.map(p =>
> priceDocument(priceStruct(p(0).toString,p(1).toString,p(2).toString,p(3).toDouble,
> currency), operation))
>         val df = spark.createDataFrame(RDDString)
>         //df.printSchema
>         var document = df.filter('priceInfo.getItem("price") > 90.0)
>         MongoSpark.save(document, writeConfig)
>          println("Current time is: " + Calendar.getInstance.getTime)
>          totalPrices += document.count
>          var endTimeQuery = System.currentTimeMillis
>          println("Total Prices added to the collection so far: "
> +totalPrices+ " , Runnig for  " + (endTimeQuery -
> startTimeQuery)/(1000*60)+" Minutes")
>          // Check if running time > runTime exit
>          if( (endTimeQuery - startTimeQuery)/(100000*60) > runTime)
>          {
>            println("\nDuration exceeded " + runTime + " minutes exiting")
>            System.exit(0)
>          }
>          // picking up individual arrays -->
> df.select('otherDetails.getItem("tickerQuotes")(0)) shows first element
>          //val lookups = df.filter('priceInfo.getItem("ticker") ===
> tickerWatch && 'priceInfo.getItem("price") > priceWatch)
>          val lookups = df.filter('priceInfo.getItem("price") > priceWatch)
>          if(lookups.count > 0) {
>            println("High value tickers")
>            lookups.select('priceInfo.getItem("timeissued").as("Time
> issued"), 'priceInfo.getItem("ticker").as("Ticker"),
> 'priceInfo.getItem("price").cast("Double").as("Latest price")).show
>
> // Now here I want to send the content of lookups DF to another kafka
> topic!!!
> //Note that above I have created a new dstreamOut with a new topic
> topicsOut
> How that can be done?
>          }
>       }
>     }
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Reply via email to