I am trying to write incoming stream data to database. Following is the example program, this code creates a thread to listen to incoming stream of data which is csv data. this data needs to be split with delimiter and the array of data needs to be pushed to database as separate columns in the TABLE.
object dbwrite { case class record(id: Long, time: java.sql.Timestamp, rx: Int, tx: Int, total: Int, multi: Double) def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: CustomReceiver <hostname> <port>") System.exit(1) } // Create the context with a 1 second batch size val sparkConf = new SparkConf() .set(“spark.cassandra.connection.host", "localhost") .setAppName("dbwrite") .set("spark.driver.allowMultipleContexts", "true") val ssc = new StreamingContext(sparkConf, Seconds(1)) val sc = ssc.sparkContext // Create a input stream with the custom receiver on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt)) val splitRdd = lines.map(line => line.split(",") ) //val wordCounts = splitRdd.map(x => (x, 1)).reduceByKey(_ + _) // RDD[Array[String] val yourRdd = splitRdd.flatMap(arr => { val id = arr(0).toLong val rx = arr(2).toInt val tx = arr(3).toInt val total = arr(4).toInt val mul = arr(5).toInt val parsedDate = new java.util.Date() val timestamp = new java.sql.Timestamp(parsedDate.getTime()); val reco = records(id, timestamp, rx, tx, total, mul); Seq(reco) }) yourRdd.foreachRDD { rdd => for(item <- rdd.collect().toArray) print(item) } val rec = sc.parallelize(Seq(yourRdd)) rec.saveToCassandra("records", "record", SomeColumns(“id”, "time", "rx", "tx", "total”, "multi")) ssc.start() ssc.awaitTermination() } } but spark does gives following error - Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Columns not found in org.apache.spark.streaming.dstream.DStream[dbwrite.records]: [mdn, time, rx, tx, total, multi] at scala.Predef$.require(Predef.scala:233) at com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForWriting(DefaultColumnMapper.scala:108) at com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1.<init>(MappedToGettableDataConverter.scala:29) at com.datastax.spark.connector.writer.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:20) at com.datastax.spark.connector.writer.DefaultRowWriter.<init>(DefaultRowWriter.scala:17) at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:31) at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:29) at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:272) at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36) at dbwrite$.main(dbwrite.scala:63) at dbwrite.main(dbwrite.scala) i am using spark-1.6.1 and cassandra 3.5 the TABLE already created on cassandra has same column names. But the column display in alphabetical order, but all columns are avaialble. help me with the error. thanks.