OK it turned up pretty simple once I established where to read the conf
file in yarn mode and get the values for different keys

  val dbHost = conf.getString("dbHost")
  val dbPort = conf.getString("dbPort")
  val dbConnection = conf.getString("dbConnection")
  val namespace = conf.getString("namespace")
  val dbSetRead = conf.getString("dbSetRead")
  val dbSetWrite = conf.getString("dbSetWrite")
  val dbPassword = conf.getString("dbPassword")
  val bootstrapServers = conf.getString("bootstrapServers")
  val schemaRegistryURL = conf.getString("schemaRegistryURL")
  val zookeeperConnect = conf.getString("zookeeperConnect")
  val zookeeperConnectionTimeoutMs =
conf.getString("zookeeperConnectionTimeoutMs")
  val rebalanceBackoffMS = conf.getString("zookeeperConnectionTimeoutMs")
  val zookeeperSessionTimeOutMs =
conf.getString("zookeeperSessionTimeOutMs")
  val autoCommitIntervalMS = conf.getString("autoCommitIntervalMS")
  val topicsValue =  conf.getString("topicsValue")
  val memorySet = conf.getString("memorySet")
  val enableHiveSupportValue = conf.getString("enableHiveSupportValue")
  val sparkStreamingReceiverMaxRateValue =
conf.getString("sparkStreamingReceiverMaxRateValue")
  val checkpointdir = conf.getString("checkpointdir")
  val hbaseHost = conf.getString("hbaseHost")
  val zookeeperHost = conf.getString("zookeeperHost")
  val zooKeeperClientPort = conf.getString("zooKeeperClientPort")
  val batchInterval = conf.getInt("batchInterval")
  val tickerWatch = conf.getString("tickerWatch")
  val priceWatch= conf.getDouble("priceWatch")
  val op_type = conf.getInt("op_type")
  val currency = conf.getString("currency")
  val tickerType = conf.getString("tickerType")
  val tickerClass = conf.getString("tickerClass")
  val tickerStatus = conf.getString("tickerStatus")
  val confidenceLevel = conf.getDouble("confidenceLevel")
  var hvTicker = conf.getInt("hvTicker")



Thanks again.


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 4 Aug 2019 at 22:32, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Many thanks Chris.
>
> In my Spark streaming I would like to use the config file to read the
> parameters in. Taking your example, I have
>
>        val globalConfig = ConfigFactory.load()
>     val conf       = globalConfig.getConfig(sparkAppName)  // extract out
> top level key from top level namespace
>    val printEntry = new
> java.util.function.Consumer[java.util.Map.Entry[String,com.typesafe.config.ConfigValue]]
> {
>     override def accept(a:
> java.util.Map.Entry[String,com.typesafe.config.ConfigValue]): Unit = {
>       val key = a.getKey
>       val value = a.getValue.unwrapped
>       //val value2 = conf.getAnyRef(a.getKey)
>        println( s"$key = $value")
>     }
>   }
> conf.entrySet.iterator.forEachRemaining (printEntry)
>
>
> And my config file for spark is like below
>
>  cat md_AerospikeAerospike.conf
> md_AerospikeAerospike {
>   dbDatabase = "trading"
>   dbPassword = "mongodb"
>   dbUsername = "trading_user_RW"
>   bootstrapServers = "rhes75:9092, rhes75:9093, rhes75:9094, rhes564:9092,
> rhes564:9093, rhes564:9094, rhes76:9092, rhes76:9093, rhes76:9094"
>   schemaRegistryURL = "http://rhes75:8081";
>   zookeeperConnect = "rhes75:2181, rhes564:2181, rhes76:2181"
>   zookeeperConnectionTimeoutMs = "10000"
>   rebalanceBackoffMS = "15000"
>   zookeeperSessionTimeOutMs = "15000"
>   autoCommitIntervalMS = "12000"
>   topicsValue = "md"
>   memorySet = "F"
>   enableHiveSupport = null
>   enableHiveSupportValue = "true"
>   sparkStreamingReceiverMaxRateValue = "0"
>   checkpointdir = "/checkpoint"
>   mongodbHost = "rhes75"
>   mongodbPort = "60100"
>   zookeeperHost = "rhes75"
>   zooKeeperClientPort = "2181"
>   batchInterval = 2
>   tickerWatch = "VOD"
>   priceWatch = 300.0
>   op_type = 1
>   currency = "GBP"
>   tickerType = "short"
>   tickerClass = "asset"
>   tickerStatus = "valid"
> }
>
> So I want them to be imported into Spark program.
>
> Using the above println( s"$key = $value"), I get
>
> zookeeperHost = rhes75
> zookeeperSessionTimeOutMs = 15000
> memorySet = F
> topicsValue = md
> currency = GBP
> rebalanceBackoffMS = 15000
> tickerStatus = valid
> enableHiveSupportValue = true
> autoCommitIntervalMS = 12000
> mongodbPort = 60100
> bootstrapServers = rhes75:9092, rhes75:9093, rhes75:9094, rhes564:9092,
> rhes564:9093, rhes564:9094, rhes76:9092, rhes76:9093, rhes76:9094
> zookeeperConnect = rhes75:2181, rhes564:2181, rhes76:2181
> zookeeperConnectionTimeoutMs = 10000
> dbUsername = trading_user_RW
> dbPassword = mongodb
> tickerWatch = VOD
> tickerClass = asset
> checkpointdir = /checkpoint
> mongodbHost = rhes75
> schemaRegistryURL = http://rhes75:8081
> tickerType = short
> zooKeeperClientPort = 2181
> priceWatch = 300
> batchInterval = 2
> op_type = 1
> sparkStreamingReceiverMaxRateValue = 0
> dbDatabase = trading
>
> Two things please. They are read in in a different order and secondly *the
> String values are not quoted.** like currency = GBP as opposed to
> currency = "GBP"*
>
> What would be the easiest way of resolving the above please
>
> Regards,
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 4 Aug 2019 at 01:55, Chris Teoh <chris.t...@gmail.com> wrote:
>
>> This seems to work-
>>
>> val printEntry = new
>> java.util.function.Consumer[java.util.Map.Entry[String,com.typesafe.config.ConfigValue]]
>> {
>>
>>     override def accept(a:
>> java.util.Map.Entry[String,com.typesafe.config.ConfigValue]): Unit = {
>>
>>       println(a.getKey)
>>
>>     }
>>
>>   }
>>
>>
>>
>> conf.entrySet.iterator.forEachRemaining (printEntry)
>>
>>
>>
>> // returns
>>
>> scala> conf.entrySet.iterator.forEachRemaining (printEntry)
>>
>> dbUsername
>>
>> dbPassword
>>
>> bootstrapServers
>>
>> dbDatabase
>>
>>
>> I hope that helps.
>>
>> On Sun, 4 Aug 2019 at 05:29, Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a config file application.conf that I am trying to read.
>>>
>>> The skeleton code is as follows:
>>>
>>> ```
>>> import com.typesafe.config.ConfigFactory
>>> import scala.collection.JavaConverters
>>>   def main(args: Array[String]): Unit = {
>>>     val globalConfig = ConfigFactory.load()  // pass in filename
>>> (without extension) to load additional config file in src/main/resources or
>>> CLASSPATH
>>>     val conf       = globalConfig.getConfig("database")  // extract out
>>> top level key from top level namespace
>>>     conf.entrySet().iterator().forEachRemaining { entry =>
>>>       val key:    String = entry.getKey
>>>       val value:  Any    = entry.getValue.unwrapped()  // access via
>>> entry
>>>       val value2: Any    = conf.getAnyRef(key)         // access via
>>> hash lookup from config
>>>       println( s"$key : $value | $value2" )              // string
>>> interpolation
>>>     }
>>>   }
>>> ```
>>>
>>> But I am getting the following error
>>>
>>> ```
>>> [info] Compiling 1 Scala source to
>>> /data6/hduser/scala/testconf/target/scala-2.11/classes...
>>> [error]
>>> /data6/hduser/scala/testconf/src/main/scala/myPackage/testconf.scala:10:
>>> missing parameter type
>>> [error]     conf.entrySet().iterator().forEachRemaining { entry =>
>>> [error]                                                   ^
>>> [error] one error found
>>> [error] (compile:compileIncremental) Compilation failed
>>> ```
>>> The application.conf has the following layout
>>>
>>> database = {
>>>   dbDatabase = "trading"
>>>   dbPassword = "mongodb"
>>>   dbUsername = "trading_user_RW"
>>>   bootstrapServers = "rhes75:9092"
>>> }
>>>
>>> I appreciate any hint
>>>
>>> Thanks,
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>
>>
>> --
>> Chris
>>
>

Reply via email to