This code works with Spark 2.3.0 via spark-shell.

scala> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
PRICE: Float)
defined class columns

scala> import spark.implicits._
import spark.implicits._

scala> var df = Seq(columns("key", "ticker", "timeissued", 1.23f)).toDF
18/09/06 18:02:23 WARN ObjectStore: Failed to get database global_temp,
returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2
more fields]

scala> df
res0: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2
more fields]

Maybe need to know about actual type of key, ticker, timeissued, price from
your variables.

Jungtaek Lim (HeartSaVioR)

2018년 9월 6일 (목) 오후 5:57, Mich Talebzadeh <mich.talebza...@gmail.com>님이 작성:

> I am trying to understand why spark cannot convert a simple comma
> separated columns as DF.
>
> I did a test
>
> I took one line of print and stored it as a one liner csv file as below
>
> var allInOne = key+","+ticker+","+timeissued+","+price
> println(allInOne)
>
> cat crap.csv
> 6e84b11d-cb03-44c0-aab6-37e06e06c996,MRW,2018-09-06T09:35:53,275.45
>
> Then after storing it in HDFS, I read that file as below
>
> import org.apache.spark.sql.functions._
> val location="hdfs://rhes75:9000/tmp/crap.csv"
> val df1 = spark.read.option("header", false).csv(location)
> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE:
> Double)
> val df2 = df1.map(p => columns(p(0).toString,p(1).toString,
> p(2).toString,p(3).toString.toDouble))
> df2.printSchema
>
> This is the result I get
>
> df1: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2 more
> fields]
> defined class columns
> df2: org.apache.spark.sql.Dataset[columns] = [KEY: string, TICKER: string
> ... 2 more fields]
> root
>  |-- KEY: string (nullable = true)
>  |-- TICKER: string (nullable = true)
>  |-- TIMEISSUED: string (nullable = true)
>  |-- PRICE: double (nullable = false)
>
> So in my case the only difference is that that comma separated line is
> stored in a String as opposed to csv.
>
> How can I achieve this simple transformation?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 6 Sep 2018 at 03:38, Manu Zhang <owenzhang1...@gmail.com> wrote:
>
>> Have you tried adding Encoder for columns as suggested by Jungtaek Lim ?
>>
>> On Thu, Sep 6, 2018 at 6:24 AM Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> I can rebuild the comma separated list as follows:
>>>
>>>
>>>    case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>> PRICE: Float)
>>>     val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>>>     import sqlContext.implicits._
>>>
>>>
>>>          for(line <- pricesRDD.collect.toArray)
>>>          {
>>>            var key = line._2.split(',').view(0).toString
>>>            var ticker =  line._2.split(',').view(1).toString
>>>            var timeissued = line._2.split(',').view(2).toString
>>>            var price = line._2.split(',').view(3).toFloat
>>>            var allInOne = key+","+ticker+","+timeissued+","+price
>>>            println(allInOne)
>>>
>>> and the print shows the columns separated by ","
>>>
>>>
>>> 34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89
>>>
>>> So I just need to convert that line of rowinto a DataFrame
>>>
>>> I try this conversion to DF to write to MongoDB document with 
>>> MongoSpark.save(df,
>>> writeConfig)
>>>
>>> var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued,
>>> price))).toDF
>>>
>>> [error]
>>> /data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235:
>>> value toDF is not a member of org.apache.spark.rdd.RDD[columns]
>>> [error]             var df = sparkContext.parallelize(Seq(columns(key,
>>> ticker, timeissued, price))).toDF
>>> [
>>>
>>>
>>> frustrating!
>>>
>>>  has anyone come across this?
>>>
>>> thanks
>>>
>>> On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <mich.talebza...@gmail.com>
>>> wrote:
>>>
>>>> yep already tried it and it did not work.
>>>>
>>>> thanks
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <deepakmc...@gmail.com>
>>>> wrote:
>>>>
>>>>> Try this:
>>>>>
>>>>> *import **spark*.implicits._
>>>>>
>>>>> df.toDF()
>>>>>
>>>>>
>>>>> On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> With the following
>>>>>>
>>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>>>>> PRICE: Float)
>>>>>>
>>>>>>  var key = line._2.split(',').view(0).toString
>>>>>>  var ticker =  line._2.split(',').view(1).toString
>>>>>>  var timeissued = line._2.split(',').view(2).toString
>>>>>>  var price = line._2.split(',').view(3).toFloat
>>>>>>
>>>>>>   var df = Seq(columns(key, ticker, timeissued, price))
>>>>>>  println(df)
>>>>>>
>>>>>> I get
>>>>>>
>>>>>>
>>>>>> List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))
>>>>>>
>>>>>> So just need to convert that list to DF
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * 
>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>> may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> The spark  is version 2.3.0
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * 
>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <kabh...@gmail.com> wrote:
>>>>>>>
>>>>>>>> You may also find below link useful (though it looks far old),
>>>>>>>> since case class is the thing which Encoder is available, so there may 
>>>>>>>> be
>>>>>>>> another reason which prevent implicit conversion.
>>>>>>>>
>>>>>>>>
>>>>>>>> https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973
>>>>>>>>
>>>>>>>> And which Spark version do you use?
>>>>>>>>
>>>>>>>>
>>>>>>>> 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <kabh...@gmail.com>님이 작성:
>>>>>>>>
>>>>>>>>> Sorry I guess I pasted another method. the code is...
>>>>>>>>>
>>>>>>>>> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): 
>>>>>>>>> DatasetHolder[T] = {
>>>>>>>>>   DatasetHolder(_sqlContext.createDataset(s))
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <kabh...@gmail.com>님이 작성:
>>>>>>>>>
>>>>>>>>>> I guess you need to have encoder for the type of result for
>>>>>>>>>> columns().
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>>>>>>>>>>
>>>>>>>>>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): 
>>>>>>>>>> DatasetHolder[T] = {
>>>>>>>>>>   DatasetHolder(_sqlContext.createDataset(rdd))
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> You can see lots of Encoder implementations in the scala code. If
>>>>>>>>>> your type doesn't match anything it may not work and you need to 
>>>>>>>>>> provide
>>>>>>>>>> custom Encoder.
>>>>>>>>>>
>>>>>>>>>> -Jungtaek Lim (HeartSaVioR)
>>>>>>>>>>
>>>>>>>>>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <
>>>>>>>>>> mich.talebza...@gmail.com>님이 작성:
>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>> I already do that as below
>>>>>>>>>>>
>>>>>>>>>>>     val sqlContext= new
>>>>>>>>>>> org.apache.spark.sql.SQLContext(sparkContext)
>>>>>>>>>>>   import sqlContext.implicits._
>>>>>>>>>>>
>>>>>>>>>>> but still getting the error!
>>>>>>>>>>>
>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>>> responsibility for any loss, damage or destruction of data or any 
>>>>>>>>>>> other
>>>>>>>>>>> property which may arise from relying on this email's technical 
>>>>>>>>>>> content is
>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <kabh...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> You may need to import implicits from your spark session like
>>>>>>>>>>>> below:
>>>>>>>>>>>> (Below code is borrowed from
>>>>>>>>>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html
>>>>>>>>>>>> )
>>>>>>>>>>>>
>>>>>>>>>>>> import org.apache.spark.sql.SparkSession
>>>>>>>>>>>> val spark = SparkSession
>>>>>>>>>>>>   .builder()
>>>>>>>>>>>>   .appName("Spark SQL basic example")
>>>>>>>>>>>>   .config("spark.some.config.option", "some-value")
>>>>>>>>>>>>   .getOrCreate()
>>>>>>>>>>>> // For implicit conversions like converting RDDs to 
>>>>>>>>>>>> DataFramesimport spark.implicits._
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <
>>>>>>>>>>>> mich.talebza...@gmail.com>님이 작성:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have spark streaming that send data and I need to put that
>>>>>>>>>>>>> data into MongoDB for test purposes. The easiest way is to create 
>>>>>>>>>>>>> a DF from
>>>>>>>>>>>>> the individual list of columns as below
>>>>>>>>>>>>>
>>>>>>>>>>>>> I loop over individual rows in RDD and perform the following
>>>>>>>>>>>>>
>>>>>>>>>>>>>     case class columns(KEY: String, TICKER: String,
>>>>>>>>>>>>> TIMEISSUED: String, PRICE: Float)
>>>>>>>>>>>>>
>>>>>>>>>>>>>          for(line <- pricesRDD.collect.toArray)
>>>>>>>>>>>>>          {
>>>>>>>>>>>>>             var key = line._2.split(',').view(0).toString
>>>>>>>>>>>>>            var ticker =  line._2.split(',').view(1).toString
>>>>>>>>>>>>>            var timeissued = line._2.split(',').view(2).toString
>>>>>>>>>>>>>            var price = line._2.split(',').view(3).toFloat
>>>>>>>>>>>>>            val priceToString = line._2.split(',').view(3)
>>>>>>>>>>>>>            if (price > 90.0)
>>>>>>>>>>>>>            {
>>>>>>>>>>>>>              println ("price > 90.0, saving to MongoDB
>>>>>>>>>>>>> collection!")
>>>>>>>>>>>>>             // Save prices to mongoDB collection
>>>>>>>>>>>>>            * var df = Seq(columns(key, ticker, timeissued,
>>>>>>>>>>>>> price)).toDF*
>>>>>>>>>>>>>
>>>>>>>>>>>>> but it fails with message
>>>>>>>>>>>>>
>>>>>>>>>>>>>  value toDF is not a member of Seq[columns].
>>>>>>>>>>>>>
>>>>>>>>>>>>> What would be the easiest way of resolving this please?
>>>>>>>>>>>>>
>>>>>>>>>>>>> thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>>>>> responsibility for any loss, damage or destruction of data or any 
>>>>>>>>>>>>> other
>>>>>>>>>>>>> property which may arise from relying on this email's technical 
>>>>>>>>>>>>> content is
>>>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for 
>>>>>>>>>>>>> any
>>>>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>
>>>>> --
>>>>> Thanks
>>>>> Deepak
>>>>> www.bigdatabig.com
>>>>> www.keosha.net
>>>>>
>>>>

Reply via email to