Could you provide the Person class?

On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> I am using 2.11.8. Thanks
>
> On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some
>> known race conditions in reflection and the Scala community doesn't have
>> plan to fix it (http://docs.scala-lang.org/ov
>> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it is
>> upgrading to Scala 2.11.
>>
>> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> I am using protobuf to encode. This may not be related to the new
>>> release issue....
>>>
>>> Exception in thread "main" scala.ScalaReflectionException: <none> is
>>> not a term
>>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
>>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(S
>>> ymbols.scala:84)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc
>>> tParams(ScalaReflection.scala:811)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara
>>> ms(ScalaReflection.scala:39)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst
>>> ructorParameters(ScalaReflection.scala:800)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo
>>> rParameters(ScalaReflection.scala:39)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>> ion.scala:582)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>> ion.scala:460)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>> ly(ScalaReflection.scala:592)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>> ly(ScalaReflection.scala:583)
>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>> aversableLike.scala:252)
>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>> aversableLike.scala:252)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at scala.collection.TraversableLike$class.flatMap(TraversableLi
>>> ke.scala:252)
>>> at scala.collection.immutable.List.flatMap(List.scala:344)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>> ion.scala:583)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor
>>> (ScalaReflection.scala:425)
>>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap
>>> ply(ExpressionEncoder.scala:61)
>>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
>>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli
>>> cits.scala:47)
>>> at PersonConsumer$.main(PersonConsumer.scala:33)
>>> at PersonConsumer.main(PersonConsumer.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>>
>>> The following is my code ...
>>>
>>> object PersonConsumer {
>>>   import org.apache.spark.rdd.RDD
>>>   import com.trueaccord.scalapb.spark._
>>>   import org.apache.spark.sql.{SQLContext, SparkSession}
>>>   import com.example.protos.demo._
>>>
>>>   def main(args : Array[String]) {
>>>
>>>     def parseLine(s: String): Person =
>>>       Person.parseFrom(
>>>         org.apache.commons.codec.binary.Base64.decodeBase64(s))
>>>
>>>     val spark = SparkSession.builder.
>>>       master("local")
>>>       .appName("spark session example")
>>>       .getOrCreate()
>>>
>>>     import spark.implicits._
>>>
>>>     val ds1 = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>>>
>>>     val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>>>
>>>     val ds3 = ds2.map(str => 
>>> parseLine(str)).createOrReplaceTempView("persons")
>>>
>>>     val ds4 = spark.sqlContext.sql("select name from persons")
>>>
>>>     val query = ds4.writeStream
>>>       .outputMode("append")
>>>       .format("console")
>>>       .start()
>>>     query.awaitTermination()
>>>   }
>>> }
>>>
>>>
>>
>

Reply via email to