I guess you need to have encoder for the type of result for columns().

https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229

implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
  DatasetHolder(_sqlContext.createDataset(rdd))
}

You can see lots of Encoder implementations in the scala code. If your type
doesn't match anything it may not work and you need to provide custom
Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <mich.talebza...@gmail.com>님이 작성:

> Thanks
>
> I already do that as below
>
>     val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>   import sqlContext.implicits._
>
> but still getting the error!
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <kabh...@gmail.com> wrote:
>
>> You may need to import implicits from your spark session like below:
>> (Below code is borrowed from
>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>
>> import org.apache.spark.sql.SparkSession
>> val spark = SparkSession
>>   .builder()
>>   .appName("Spark SQL basic example")
>>   .config("spark.some.config.option", "some-value")
>>   .getOrCreate()
>> // For implicit conversions like converting RDDs to DataFramesimport 
>> spark.implicits._
>>
>>
>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <mich.talebza...@gmail.com>님이
>> 작성:
>>
>>> Hi,
>>>
>>> I have spark streaming that send data and I need to put that data into
>>> MongoDB for test purposes. The easiest way is to create a DF from the
>>> individual list of columns as below
>>>
>>> I loop over individual rows in RDD and perform the following
>>>
>>>     case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>> PRICE: Float)
>>>
>>>          for(line <- pricesRDD.collect.toArray)
>>>          {
>>>             var key = line._2.split(',').view(0).toString
>>>            var ticker =  line._2.split(',').view(1).toString
>>>            var timeissued = line._2.split(',').view(2).toString
>>>            var price = line._2.split(',').view(3).toFloat
>>>            val priceToString = line._2.split(',').view(3)
>>>            if (price > 90.0)
>>>            {
>>>              println ("price > 90.0, saving to MongoDB collection!")
>>>             // Save prices to mongoDB collection
>>>            * var df = Seq(columns(key, ticker, timeissued, price)).toDF*
>>>
>>> but it fails with message
>>>
>>>  value toDF is not a member of Seq[columns].
>>>
>>> What would be the easiest way of resolving this please?
>>>
>>> thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>

Reply via email to