Hi Sotiris,

can you upload any sample data and then I will write the code and send it
across to you?

Do you have any particular used case for Influx DB?


Regards,
Gourav Sengupta

On Thu, Jun 29, 2017 at 5:42 PM, Sotiris Beis <sot.b...@gmail.com> wrote:

> Hi Gourav,
>
> Do you have any suggestions of how the use of dataframes will solve my
> problem?
>
> Cheers,
> Sotiris
>
> On Thu, 29 Jun 2017 at 17:37 Gourav Sengupta <gourav.sengu...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I still do not understand why people do not use data frames.
>>
>> It makes you smile, take a sip of fine coffee, and feel good about life
>> and its all courtesy@SPARK. :)
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Thu, Jun 29, 2017 at 12:18 PM, Ryan <ryan.hd....@gmail.com> wrote:
>>
>>> I think it creates a new connection on each worker, whenever the
>>> Processor references Resource, it got initialized.
>>> There's no need for the driver connect to the db in this case.
>>>
>>> On Thu, Jun 29, 2017 at 5:52 PM, salvador <sot.b...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am writing a spark job from which at some point I want to send some
>>>> metrics to InfluxDB. Here is some sample code of how I am doing it at
>>>> the
>>>> moment.
>>>>
>>>> I have a Resources object class which contains all the details for the
>>>> db
>>>> connection:
>>>>
>>>> object Resources { def forceInit: () => Unit = () => ()
>>>>   val influxHost: String = Config.influxHost.getOrElse("localhost")
>>>>   val influxUdpPort: Int = Config.influxUdpPort.getOrElse(30089)
>>>>
>>>>   val influxDB = new MetricsClient(influxHost, influxUdpPort, "spark")
>>>>
>>>> }
>>>>
>>>> This is how my code on the driver looks like:
>>>>
>>>> object ProcessStuff extends App {
>>>>   val spark = SparkSession .builder() .config(sparkConfig)
>>>> .getOrCreate()
>>>>   val df = spark .read .parquet(Config.input)
>>>>
>>>>   Resources.forceInit
>>>>
>>>>   val annotatedSentences = df.rdd
>>>>     .map {
>>>>       case (Row(a: String, b: String)) => Processor.process(a,b)
>>>>     }
>>>>     .cache()
>>>> }
>>>>
>>>> I am sending all the metrics I want from the process() method which
>>>> uses the
>>>> client I initialised on the driver code. Currently this works and I am
>>>> able
>>>> to send millions of data point. I was just wandering how it works
>>>> internally. Does it share the db connection or creates a new connection
>>>> every time?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-spark-user-list.
>>>> 1001560.n3.nabble.com/Understanding-how-spark-share-
>>>> db-connections-created-on-driver-tp28806.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>

Reply via email to