I guess you need to have encoder for the type of result for columns(). https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(rdd)) } You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder. -Jungtaek Lim (HeartSaVioR) 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <mich.talebza...@gmail.com>님이 작성: > Thanks > > I already do that as below > > val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext) > import sqlContext.implicits._ > > but still getting the error! > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <kabh...@gmail.com> wrote: > >> You may need to import implicits from your spark session like below: >> (Below code is borrowed from >> https://spark.apache.org/docs/latest/sql-programming-guide.html) >> >> import org.apache.spark.sql.SparkSession >> val spark = SparkSession >> .builder() >> .appName("Spark SQL basic example") >> .config("spark.some.config.option", "some-value") >> .getOrCreate() >> // For implicit conversions like converting RDDs to DataFramesimport >> spark.implicits._ >> >> >> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <mich.talebza...@gmail.com>님이 >> 작성: >> >>> Hi, >>> >>> I have spark streaming that send data and I need to put that data into >>> MongoDB for test purposes. The easiest way is to create a DF from the >>> individual list of columns as below >>> >>> I loop over individual rows in RDD and perform the following >>> >>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, >>> PRICE: Float) >>> >>> for(line <- pricesRDD.collect.toArray) >>> { >>> var key = line._2.split(',').view(0).toString >>> var ticker = line._2.split(',').view(1).toString >>> var timeissued = line._2.split(',').view(2).toString >>> var price = line._2.split(',').view(3).toFloat >>> val priceToString = line._2.split(',').view(3) >>> if (price > 90.0) >>> { >>> println ("price > 90.0, saving to MongoDB collection!") >>> // Save prices to mongoDB collection >>> * var df = Seq(columns(key, ticker, timeissued, price)).toDF* >>> >>> but it fails with message >>> >>> value toDF is not a member of Seq[columns]. >>> >>> What would be the easiest way of resolving this please? >>> >>> thanks >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>