Hi Gabor,

I will look at the link and see what it provides.

Thanks,


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 27 Mar 2019 at 21:23, Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Hi Mich,
>
> Please take a look at how to write data into Kafka topic with DStreams:
> https://github.com/gaborgsomogyi/spark-dstream-secure-kafka-sink-app/blob/62d64ce368bc07b385261f85f44971b32fe41327/src/main/scala/com/cloudera/spark/examples/DirectKafkaSinkWordCount.scala#L77
> (DStreams has no native Kafka sink, if you need it use Structured
> Streaming)
>
> BR,
> G
>
>
> On Wed, Mar 27, 2019 at 8:47 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Hi,
>>
>> In a traditional we get data via Kafka into Spark streaming, do some work
>> and write to a NoSQL database like Mongo, Hbase or Aerospike.
>>
>> That part can be done below and is best explained by the code as follows:
>>
>> Once a high value DF lookups is created I want send the data to a new
>> topic for recipients!
>>
>>     val kafkaParams = Map[String, String](
>>                                       "bootstrap.servers" ->
>> bootstrapServers,
>>                                       "schema.registry.url" ->
>> schemaRegistryURL,
>>                                        "zookeeper.connect" ->
>> zookeeperConnect,
>>                                        "group.id" -> sparkAppName,
>>                                        "zookeeper.connection.timeout.ms"
>> -> zookeeperConnectionTimeoutMs,
>>                                        "rebalance.backoff.ms" ->
>> rebalanceBackoffMS,
>>                                        "zookeeper.session.timeout.ms" ->
>> zookeeperSessionTimeOutMs,
>>                                        "auto.commit.interval.ms" ->
>> autoCommitIntervalMS
>>                                      )
>>     //val topicsSet = topics.split(",").toSet
>>     val topics = Set(topicsValue)
>>     val dstream = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>     // This returns a tuple of key and value (since messages in Kafka are
>> optionally keyed). In this case it is of type (String, String)
>>     dstream.cache()
>>     //
>>     val topicsOut = Set(topicsValueOut)
>>     val dstreamOut = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsOut)
>>     dstreamOut.cache()
>>
>>
>>     dstream.foreachRDD
>>     { pricesRDD =>
>>       if (!pricesRDD.isEmpty)  // data exists in RDD
>>       {
>>         val op_time = System.currentTimeMillis.toString
>>         val spark =
>> SparkSessionSingleton.getInstance(pricesRDD.sparkContext.getConf)
>>         val sc = spark.sparkContext
>>         import spark.implicits._
>>         var operation = new operationStruct(op_type, op_time)
>>         // Convert RDD[String] to RDD[case class] to DataFrame
>>         val RDDString = pricesRDD.map { case (_, value) =>
>> value.split(',') }.map(p =>
>> priceDocument(priceStruct(p(0).toString,p(1).toString,p(2).toString,p(3).toDouble,
>> currency), operation))
>>         val df = spark.createDataFrame(RDDString)
>>         //df.printSchema
>>         var document = df.filter('priceInfo.getItem("price") > 90.0)
>>         MongoSpark.save(document, writeConfig)
>>          println("Current time is: " + Calendar.getInstance.getTime)
>>          totalPrices += document.count
>>          var endTimeQuery = System.currentTimeMillis
>>          println("Total Prices added to the collection so far: "
>> +totalPrices+ " , Runnig for  " + (endTimeQuery -
>> startTimeQuery)/(1000*60)+" Minutes")
>>          // Check if running time > runTime exit
>>          if( (endTimeQuery - startTimeQuery)/(100000*60) > runTime)
>>          {
>>            println("\nDuration exceeded " + runTime + " minutes exiting")
>>            System.exit(0)
>>          }
>>          // picking up individual arrays -->
>> df.select('otherDetails.getItem("tickerQuotes")(0)) shows first element
>>          //val lookups = df.filter('priceInfo.getItem("ticker") ===
>> tickerWatch && 'priceInfo.getItem("price") > priceWatch)
>>          val lookups = df.filter('priceInfo.getItem("price") > priceWatch)
>>          if(lookups.count > 0) {
>>            println("High value tickers")
>>            lookups.select('priceInfo.getItem("timeissued").as("Time
>> issued"), 'priceInfo.getItem("ticker").as("Ticker"),
>> 'priceInfo.getItem("price").cast("Double").as("Latest price")).show
>>
>> // Now here I want to send the content of lookups DF to another kafka
>> topic!!!
>> //Note that above I have created a new dstreamOut with a new topic
>> topicsOut
>> How that can be done?
>>          }
>>       }
>>     }
>>
>> Thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>

Reply via email to