I am trying to write incoming stream data to database. Following is the example 
program, this code creates a thread to listen to incoming stream of data which 
is csv data. this data needs to be split with delimiter and the array of data 
needs to be pushed to database as separate columns in the TABLE.

object dbwrite {
  case class record(id: Long, time: java.sql.Timestamp, rx: Int, tx: Int, 
total: Int, multi: Double)
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: CustomReceiver <hostname> <port>")
      System.exit(1)
    }

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf()
                        .set(“spark.cassandra.connection.host", "localhost")
                        .setAppName("dbwrite")
                        .set("spark.driver.allowMultipleContexts", "true")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    val sc = ssc.sparkContext

    // Create a input stream with the custom receiver on target ip:port and 
count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt))
    val splitRdd = lines.map(line => line.split(",") )
    //val wordCounts = splitRdd.map(x => (x, 1)).reduceByKey(_ + _)
    // RDD[Array[String]

    val yourRdd = splitRdd.flatMap(arr => {
      val id = arr(0).toLong
      val rx = arr(2).toInt
      val tx = arr(3).toInt
      val total = arr(4).toInt
      val mul = arr(5).toInt
      val parsedDate = new java.util.Date()
      val timestamp = new java.sql.Timestamp(parsedDate.getTime());
      val reco = records(id, timestamp, rx, tx, total, mul);
      Seq(reco)
    })

    yourRdd.foreachRDD { rdd =>
        for(item <- rdd.collect().toArray)
          print(item)
    }
    val rec = sc.parallelize(Seq(yourRdd))
    rec.saveToCassandra("records", "record", SomeColumns(“id”, "time", "rx", 
"tx", "total”, "multi"))

    ssc.start()
    ssc.awaitTermination()
  }
}
but spark does gives following error -
Exception in thread "main" java.lang.IllegalArgumentException: requirement 
failed: Columns not found in 
org.apache.spark.streaming.dstream.DStream[dbwrite.records]: [mdn, time, rx, 
tx, total, multi]
        at scala.Predef$.require(Predef.scala:233)
        at 
com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForWriting(DefaultColumnMapper.scala:108)
        at 
com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1.<init>(MappedToGettableDataConverter.scala:29)
        at 
com.datastax.spark.connector.writer.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:20)
        at 
com.datastax.spark.connector.writer.DefaultRowWriter.<init>(DefaultRowWriter.scala:17)
        at 
com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:31)
        at 
com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:29)
        at 
com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:272)
        at 
com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
        at dbwrite$.main(dbwrite.scala:63)
        at dbwrite.main(dbwrite.scala)
i am using spark-1.6.1 and cassandra 3.5
the TABLE already created on cassandra has same column names. But the column 
display in alphabetical order, but all columns are avaialble.
help me with the error.

thanks.

Reply via email to