I am using protobuf to encode. This may not be related to the new release issue....
Exception in thread "main" scala.ScalaReflectionException: <none> is not a term at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199) at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols.scala:84) at org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams(ScalaReflection.scala:811) at org.apache.spark.sql.catalyst.ScalaReflection$.constructParams(ScalaReflection.scala:39) at org.apache.spark.sql.catalyst.ScalaReflection$class.getConstructorParameters(ScalaReflection.scala:800) at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameters(ScalaReflection.scala:39) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:582) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:460) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:592) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:583) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252) at scala.collection.immutable.List.flatMap(List.scala:344) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:583) at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:425) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:61) at org.apache.spark.sql.Encoders$.product(Encoders.scala:274) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47) at PersonConsumer$.main(PersonConsumer.scala:33) at PersonConsumer.main(PersonConsumer.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) The following is my code ... object PersonConsumer { import org.apache.spark.rdd.RDD import com.trueaccord.scalapb.spark._ import org.apache.spark.sql.{SQLContext, SparkSession} import com.example.protos.demo._ def main(args : Array[String]) { def parseLine(s: String): Person = Person.parseFrom( org.apache.commons.codec.binary.Base64.decodeBase64(s)) val spark = SparkSession.builder. master("local") .appName("spark session example") .getOrCreate() import spark.implicits._ val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load() val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String] val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons") val ds4 = spark.sqlContext.sql("select name from persons") val query = ds4.writeStream .outputMode("append") .format("console") .start() query.awaitTermination() } }