I want to parse the Struct of data dynamically , then write data to delta lake , I think it can automatically merge scheme.
2019-09-17 lk_spark 发件人:Tathagata Das <tathagata.das1...@gmail.com> 发送时间:2019-09-17 16:13 主题:Re: how can I dynamic parse json in kafka when using Structured Streaming 收件人:"lk_spark"<lk_sp...@163.com> 抄送:"user.spark"<user@spark.apache.org> You can use from_json built-in SQL function to parse json. https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column- On Mon, Sep 16, 2019 at 7:39 PM lk_spark <lk_sp...@163.com> wrote: hi,all : I'm using Structured Streaming to read kafka , the data type is json String , I want to parse it and conver to a datafrme , my code can't pass compile , I don't know how to fix it: val lines = messages.selectExpr("CAST(value AS STRING) as value").as[String] val words = lines.map(line => { var json: JValue = null try { json = parse(line) } catch { case ex: Exception => { println(ex.getMessage + " " + line) } } //var result: scala.collection.mutable.Map[String,String] = scala.collection.mutable.Map() val jsonObj = json.values.asInstanceOf[Map[String, _]] val valuse = jsonObj.values.toArray val schema = StructType(List()) for ((k, v) <- jsonObj){ //result += (k -> jsonObj.get(k).toString()) if(v.isInstanceOf[String]){ schema.add(k,StringType) }else if (v.isInstanceOf[Int]){ schema.add(k,IntegerType) }/*else if (v.isInstanceOf[Array[String]]){ schema.add(k,ArrayType(StringType)) }else if (v.isInstanceOf[Map[String,String]]){ schema.add(k,MapType(StringType,StringType)) }*/ } val row = new GenericRowWithSchema(valuse,schema) row }) Error:(45, 26) Unable to find encoder for type org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is needed to store org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. val words = lines.map(line => { Error:(45, 26) not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema]. Unspecified value parameter evidence$6. val words = lines.map(line => { 2019-09-17 lk_spark