hi,all:
       I'm using spark 2.4.4 to readstream data from kafka and want to write to 
kudu 1.7.0 , my code like below : 

    val kuduContext = new KuduContext("master:7051", spark.sparkContext)

    val console = cnew.select("*").as[CstoreNew]
      .writeStream
      .option("checkpointLocation", "/tmp/t3/")
      .trigger(Trigger.Once())
      .foreach(new ForeachWriter[CstoreNew] {
        override def open(partitionId: Long, version: Long): Boolean = {
          true
        }
        override def process(value: CstoreNew): Unit = {
          val spark = SparkSessionSingleton.getInstance(sparkConf)
          val valueDF = Seq(value).toDF()   // GET WRONG
          kuduContext.upsertRows(valueDF, "impala::test.cstore_bury_event_data")
        }
        override def close(errorOrNull: Throwable): Unit = {
        }
      })
    val query = console.start()
    query.awaitTermination()

when run to val valueDF = Seq(value).toDF() I got error msg : 
Caused by: java.lang.NullPointerException
 at 
org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:228)
 at 
com.gaojihealth.spark.kafkaconsumer.CstoreNew2KUDU$$anon$1.process(CstoreNew2KUDU.scala:122)
...

and  SQLImplicits.scala:228 is :

227:   implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): 
DatasetHolder[T] = {
228:        DatasetHolder(_sqlContext.createDataset(s))
229:   }

can anyone give me some help?
2019-11-25


lk_spark 

Reply via email to