I want to parse the Struct of data dynamically , then write data to delta lake 
, I think it can automatically merge scheme. 

2019-09-17 

lk_spark 



发件人:Tathagata Das <tathagata.das1...@gmail.com>
发送时间:2019-09-17 16:13
主题:Re: how can I dynamic parse json in kafka when using Structured Streaming
收件人:"lk_spark"<lk_sp...@163.com>
抄送:"user.spark"<user@spark.apache.org>

You can use from_json built-in SQL function to parse json. 
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column-



On Mon, Sep 16, 2019 at 7:39 PM lk_spark <lk_sp...@163.com> wrote:

hi,all :
    I'm using Structured Streaming to read kafka , the data type is json String 
, I want to parse  it and conver to a datafrme , my code can't pass compile , I 
don't know how to fix it: 

val lines = messages.selectExpr("CAST(value AS STRING) as value").as[String]

val words = lines.map(line => {
  var json: JValue = null
  try {
    json = parse(line)
  } catch {
    case ex: Exception => { println(ex.getMessage + " " + line) }
  }
  //var result: scala.collection.mutable.Map[String,String] = 
scala.collection.mutable.Map()
  val jsonObj = json.values.asInstanceOf[Map[String, _]]
  val valuse = jsonObj.values.toArray
  val schema = StructType(List())
  for ((k, v) <- jsonObj){
    //result += (k -> jsonObj.get(k).toString())

    if(v.isInstanceOf[String]){
      schema.add(k,StringType)
    }else if (v.isInstanceOf[Int]){
      schema.add(k,IntegerType)
    }/*else if (v.isInstanceOf[Array[String]]){
      schema.add(k,ArrayType(StringType))
    }else if (v.isInstanceOf[Map[String,String]]){
      schema.add(k,MapType(StringType,StringType))
    }*/
  }
  val row =  new GenericRowWithSchema(valuse,schema)
  row
})

Error:(45, 26) Unable to find encoder for type 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit 
Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is 
needed to store org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 
instances in a Dataset. Primitive types (Int, String, etc) and Product types 
(case classes) are supported by importing spark.implicits._  Support for 
serializing other types will be added in future releases.
    val words = lines.map(line => {

Error:(45, 26) not enough arguments for method map: (implicit evidence$6: 
org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema].
Unspecified value parameter evidence$6.
    val words = lines.map(line => {



2019-09-17


lk_spark 

Reply via email to