Could you provide the Person class? On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande <deshpandesh...@gmail.com> wrote:
> I am using 2.11.8. Thanks > > On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some >> known race conditions in reflection and the Scala community doesn't have >> plan to fix it (http://docs.scala-lang.org/ov >> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it is >> upgrading to Scala 2.11. >> >> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande < >> deshpandesh...@gmail.com> wrote: >> >>> I am using protobuf to encode. This may not be related to the new >>> release issue.... >>> >>> Exception in thread "main" scala.ScalaReflectionException: <none> is >>> not a term >>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199) >>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(S >>> ymbols.scala:84) >>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc >>> tParams(ScalaReflection.scala:811) >>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara >>> ms(ScalaReflection.scala:39) >>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst >>> ructorParameters(ScalaReflection.scala:800) >>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo >>> rParameters(ScalaReflection.scala:39) >>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp >>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect >>> ion.scala:582) >>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp >>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect >>> ion.scala:460) >>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app >>> ly(ScalaReflection.scala:592) >>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app >>> ly(ScalaReflection.scala:583) >>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr >>> aversableLike.scala:252) >>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr >>> aversableLike.scala:252) >>> at scala.collection.immutable.List.foreach(List.scala:381) >>> at scala.collection.TraversableLike$class.flatMap(TraversableLi >>> ke.scala:252) >>> at scala.collection.immutable.List.flatMap(List.scala:344) >>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp >>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect >>> ion.scala:583) >>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor >>> (ScalaReflection.scala:425) >>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap >>> ply(ExpressionEncoder.scala:61) >>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274) >>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli >>> cits.scala:47) >>> at PersonConsumer$.main(PersonConsumer.scala:33) >>> at PersonConsumer.main(PersonConsumer.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>> ssorImpl.java:62) >>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>> thodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) >>> >>> The following is my code ... >>> >>> object PersonConsumer { >>> import org.apache.spark.rdd.RDD >>> import com.trueaccord.scalapb.spark._ >>> import org.apache.spark.sql.{SQLContext, SparkSession} >>> import com.example.protos.demo._ >>> >>> def main(args : Array[String]) { >>> >>> def parseLine(s: String): Person = >>> Person.parseFrom( >>> org.apache.commons.codec.binary.Base64.decodeBase64(s)) >>> >>> val spark = SparkSession.builder. >>> master("local") >>> .appName("spark session example") >>> .getOrCreate() >>> >>> import spark.implicits._ >>> >>> val ds1 = >>> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load() >>> >>> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String] >>> >>> val ds3 = ds2.map(str => >>> parseLine(str)).createOrReplaceTempView("persons") >>> >>> val ds4 = spark.sqlContext.sql("select name from persons") >>> >>> val query = ds4.writeStream >>> .outputMode("append") >>> .format("console") >>> .start() >>> query.awaitTermination() >>> } >>> } >>> >>> >> >