This code works with Spark 2.3.0 via spark-shell. scala> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float) defined class columns
scala> import spark.implicits._ import spark.implicits._ scala> var df = Seq(columns("key", "ticker", "timeissued", 1.23f)).toDF 18/09/06 18:02:23 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException df: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2 more fields] scala> df res0: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2 more fields] Maybe need to know about actual type of key, ticker, timeissued, price from your variables. Jungtaek Lim (HeartSaVioR) 2018년 9월 6일 (목) 오후 5:57, Mich Talebzadeh <mich.talebza...@gmail.com>님이 작성: > I am trying to understand why spark cannot convert a simple comma > separated columns as DF. > > I did a test > > I took one line of print and stored it as a one liner csv file as below > > var allInOne = key+","+ticker+","+timeissued+","+price > println(allInOne) > > cat crap.csv > 6e84b11d-cb03-44c0-aab6-37e06e06c996,MRW,2018-09-06T09:35:53,275.45 > > Then after storing it in HDFS, I read that file as below > > import org.apache.spark.sql.functions._ > val location="hdfs://rhes75:9000/tmp/crap.csv" > val df1 = spark.read.option("header", false).csv(location) > case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: > Double) > val df2 = df1.map(p => columns(p(0).toString,p(1).toString, > p(2).toString,p(3).toString.toDouble)) > df2.printSchema > > This is the result I get > > df1: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2 more > fields] > defined class columns > df2: org.apache.spark.sql.Dataset[columns] = [KEY: string, TICKER: string > ... 2 more fields] > root > |-- KEY: string (nullable = true) > |-- TICKER: string (nullable = true) > |-- TIMEISSUED: string (nullable = true) > |-- PRICE: double (nullable = false) > > So in my case the only difference is that that comma separated line is > stored in a String as opposed to csv. > > How can I achieve this simple transformation? > > Thanks > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 6 Sep 2018 at 03:38, Manu Zhang <owenzhang1...@gmail.com> wrote: > >> Have you tried adding Encoder for columns as suggested by Jungtaek Lim ? >> >> On Thu, Sep 6, 2018 at 6:24 AM Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> I can rebuild the comma separated list as follows: >>> >>> >>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, >>> PRICE: Float) >>> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext) >>> import sqlContext.implicits._ >>> >>> >>> for(line <- pricesRDD.collect.toArray) >>> { >>> var key = line._2.split(',').view(0).toString >>> var ticker = line._2.split(',').view(1).toString >>> var timeissued = line._2.split(',').view(2).toString >>> var price = line._2.split(',').view(3).toFloat >>> var allInOne = key+","+ticker+","+timeissued+","+price >>> println(allInOne) >>> >>> and the print shows the columns separated by "," >>> >>> >>> 34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89 >>> >>> So I just need to convert that line of rowinto a DataFrame >>> >>> I try this conversion to DF to write to MongoDB document with >>> MongoSpark.save(df, >>> writeConfig) >>> >>> var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued, >>> price))).toDF >>> >>> [error] >>> /data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235: >>> value toDF is not a member of org.apache.spark.rdd.RDD[columns] >>> [error] var df = sparkContext.parallelize(Seq(columns(key, >>> ticker, timeissued, price))).toDF >>> [ >>> >>> >>> frustrating! >>> >>> has anyone come across this? >>> >>> thanks >>> >>> On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <mich.talebza...@gmail.com> >>> wrote: >>> >>>> yep already tried it and it did not work. >>>> >>>> thanks >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <deepakmc...@gmail.com> >>>> wrote: >>>> >>>>> Try this: >>>>> >>>>> *import **spark*.implicits._ >>>>> >>>>> df.toDF() >>>>> >>>>> >>>>> On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh < >>>>> mich.talebza...@gmail.com> wrote: >>>>> >>>>>> With the following >>>>>> >>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, >>>>>> PRICE: Float) >>>>>> >>>>>> var key = line._2.split(',').view(0).toString >>>>>> var ticker = line._2.split(',').view(1).toString >>>>>> var timeissued = line._2.split(',').view(2).toString >>>>>> var price = line._2.split(',').view(3).toFloat >>>>>> >>>>>> var df = Seq(columns(key, ticker, timeissued, price)) >>>>>> println(df) >>>>>> >>>>>> I get >>>>>> >>>>>> >>>>>> List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5)) >>>>>> >>>>>> So just need to convert that list to DF >>>>>> >>>>>> Dr Mich Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> LinkedIn * >>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>> >>>>>> >>>>>> >>>>>> http://talebzadehmich.wordpress.com >>>>>> >>>>>> >>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>> for any loss, damage or destruction of data or any other property which >>>>>> may >>>>>> arise from relying on this email's technical content is explicitly >>>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>>> arising from such loss, damage or destruction. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> The spark is version 2.3.0 >>>>>>> >>>>>>> Dr Mich Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> LinkedIn * >>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>> >>>>>>> >>>>>>> >>>>>>> http://talebzadehmich.wordpress.com >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <kabh...@gmail.com> wrote: >>>>>>> >>>>>>>> You may also find below link useful (though it looks far old), >>>>>>>> since case class is the thing which Encoder is available, so there may >>>>>>>> be >>>>>>>> another reason which prevent implicit conversion. >>>>>>>> >>>>>>>> >>>>>>>> https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973 >>>>>>>> >>>>>>>> And which Spark version do you use? >>>>>>>> >>>>>>>> >>>>>>>> 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <kabh...@gmail.com>님이 작성: >>>>>>>> >>>>>>>>> Sorry I guess I pasted another method. the code is... >>>>>>>>> >>>>>>>>> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): >>>>>>>>> DatasetHolder[T] = { >>>>>>>>> DatasetHolder(_sqlContext.createDataset(s)) >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <kabh...@gmail.com>님이 작성: >>>>>>>>> >>>>>>>>>> I guess you need to have encoder for the type of result for >>>>>>>>>> columns(). >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229 >>>>>>>>>> >>>>>>>>>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): >>>>>>>>>> DatasetHolder[T] = { >>>>>>>>>> DatasetHolder(_sqlContext.createDataset(rdd)) >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> You can see lots of Encoder implementations in the scala code. If >>>>>>>>>> your type doesn't match anything it may not work and you need to >>>>>>>>>> provide >>>>>>>>>> custom Encoder. >>>>>>>>>> >>>>>>>>>> -Jungtaek Lim (HeartSaVioR) >>>>>>>>>> >>>>>>>>>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh < >>>>>>>>>> mich.talebza...@gmail.com>님이 작성: >>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> >>>>>>>>>>> I already do that as below >>>>>>>>>>> >>>>>>>>>>> val sqlContext= new >>>>>>>>>>> org.apache.spark.sql.SQLContext(sparkContext) >>>>>>>>>>> import sqlContext.implicits._ >>>>>>>>>>> >>>>>>>>>>> but still getting the error! >>>>>>>>>>> >>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> LinkedIn * >>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all >>>>>>>>>>> responsibility for any loss, damage or destruction of data or any >>>>>>>>>>> other >>>>>>>>>>> property which may arise from relying on this email's technical >>>>>>>>>>> content is >>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any >>>>>>>>>>> monetary damages arising from such loss, damage or destruction. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <kabh...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> You may need to import implicits from your spark session like >>>>>>>>>>>> below: >>>>>>>>>>>> (Below code is borrowed from >>>>>>>>>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html >>>>>>>>>>>> ) >>>>>>>>>>>> >>>>>>>>>>>> import org.apache.spark.sql.SparkSession >>>>>>>>>>>> val spark = SparkSession >>>>>>>>>>>> .builder() >>>>>>>>>>>> .appName("Spark SQL basic example") >>>>>>>>>>>> .config("spark.some.config.option", "some-value") >>>>>>>>>>>> .getOrCreate() >>>>>>>>>>>> // For implicit conversions like converting RDDs to >>>>>>>>>>>> DataFramesimport spark.implicits._ >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh < >>>>>>>>>>>> mich.talebza...@gmail.com>님이 작성: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> I have spark streaming that send data and I need to put that >>>>>>>>>>>>> data into MongoDB for test purposes. The easiest way is to create >>>>>>>>>>>>> a DF from >>>>>>>>>>>>> the individual list of columns as below >>>>>>>>>>>>> >>>>>>>>>>>>> I loop over individual rows in RDD and perform the following >>>>>>>>>>>>> >>>>>>>>>>>>> case class columns(KEY: String, TICKER: String, >>>>>>>>>>>>> TIMEISSUED: String, PRICE: Float) >>>>>>>>>>>>> >>>>>>>>>>>>> for(line <- pricesRDD.collect.toArray) >>>>>>>>>>>>> { >>>>>>>>>>>>> var key = line._2.split(',').view(0).toString >>>>>>>>>>>>> var ticker = line._2.split(',').view(1).toString >>>>>>>>>>>>> var timeissued = line._2.split(',').view(2).toString >>>>>>>>>>>>> var price = line._2.split(',').view(3).toFloat >>>>>>>>>>>>> val priceToString = line._2.split(',').view(3) >>>>>>>>>>>>> if (price > 90.0) >>>>>>>>>>>>> { >>>>>>>>>>>>> println ("price > 90.0, saving to MongoDB >>>>>>>>>>>>> collection!") >>>>>>>>>>>>> // Save prices to mongoDB collection >>>>>>>>>>>>> * var df = Seq(columns(key, ticker, timeissued, >>>>>>>>>>>>> price)).toDF* >>>>>>>>>>>>> >>>>>>>>>>>>> but it fails with message >>>>>>>>>>>>> >>>>>>>>>>>>> value toDF is not a member of Seq[columns]. >>>>>>>>>>>>> >>>>>>>>>>>>> What would be the easiest way of resolving this please? >>>>>>>>>>>>> >>>>>>>>>>>>> thanks >>>>>>>>>>>>> >>>>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> LinkedIn * >>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all >>>>>>>>>>>>> responsibility for any loss, damage or destruction of data or any >>>>>>>>>>>>> other >>>>>>>>>>>>> property which may arise from relying on this email's technical >>>>>>>>>>>>> content is >>>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for >>>>>>>>>>>>> any >>>>>>>>>>>>> monetary damages arising from such loss, damage or destruction. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>> >>>>> -- >>>>> Thanks >>>>> Deepak >>>>> www.bigdatabig.com >>>>> www.keosha.net >>>>> >>>>