The problem is "optional Gender gender = 3;". The generated class "Gender"
is a trait, and Spark cannot know how to create a trait so it's not
supported. You can define your class which is supported by SQL Encoder, and
convert this generated class to the new class in `parseLine`.

On Wed, Nov 16, 2016 at 4:22 PM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> Ryan,
>
> I just wanted to provide more info. Here is my .proto file which is the
> basis for generating the Person class. Thanks.
>
> option java_package = "com.example.protos";
> enum Gender {
>     MALE = 1;
>     FEMALE = 2;
> }
> message Address {
>     optional string street = 1;
>     optional string city = 2;
> }
> message Person {
>     optional string name = 1;
>     optional int32 age = 2;
>     optional Gender gender = 3;
>     repeated string tags = 4;
>     repeated Address addresses = 5;
> }
>
>
> On Wed, Nov 16, 2016 at 3:04 PM, shyla deshpande <deshpandesh...@gmail.com
> > wrote:
>
>> *Thanks for the response. Following is the Person class..*
>>
>> // Generated by the Scala Plugin for the Protocol Buffer Compiler.
>> // Do not edit!
>> //
>> // Protofile syntax: PROTO2
>>
>> package com.example.protos.demo
>>
>>
>>
>> @SerialVersionUID(0L)
>> final case class Person(
>>     name: scala.Option[String] = None,
>>     age: scala.Option[Int] = None,
>>     gender: scala.Option[com.example.protos.demo.Gender] = None,
>>     tags: scala.collection.Seq[String] = Nil,
>>     addresses: scala.collection.Seq[com.example.protos.demo.Address] = Nil
>>     ) extends com.trueaccord.scalapb.GeneratedMessage with 
>> com.trueaccord.scalapb.Message[Person] with 
>> com.trueaccord.lenses.Updatable[Person] {
>>     @transient
>>     private[this] var __serializedSizeCachedValue: Int = 0
>>     private[this] def __computeSerializedValue(): Int = {
>>       var __size = 0
>>       if (name.isDefined) { __size += 
>> com.google.protobuf.CodedOutputStream.computeStringSize(1, name.get) }
>>       if (age.isDefined) { __size += 
>> com.google.protobuf.CodedOutputStream.computeInt32Size(2, age.get) }
>>       if (gender.isDefined) { __size += 
>> com.google.protobuf.CodedOutputStream.computeEnumSize(3, gender.get.value) }
>>       tags.foreach(tags => __size += 
>> com.google.protobuf.CodedOutputStream.computeStringSize(4, tags))
>>       addresses.foreach(addresses => __size += 1 + 
>> com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(addresses.serializedSize)
>>  + addresses.serializedSize)
>>       __size
>>     }
>>     final override def serializedSize: Int = {
>>       var read = __serializedSizeCachedValue
>>       if (read == 0) {
>>         read = __computeSerializedValue()
>>         __serializedSizeCachedValue = read
>>       }
>>       read
>>     }
>>     def writeTo(`_output__`: com.google.protobuf.CodedOutputStream): Unit = {
>>       name.foreach { __v =>
>>         _output__.writeString(1, __v)
>>       };
>>       age.foreach { __v =>
>>         _output__.writeInt32(2, __v)
>>       };
>>       gender.foreach { __v =>
>>         _output__.writeEnum(3, __v.value)
>>       };
>>       tags.foreach { __v =>
>>         _output__.writeString(4, __v)
>>       };
>>       addresses.foreach { __v =>
>>         _output__.writeTag(5, 2)
>>         _output__.writeUInt32NoTag(__v.serializedSize)
>>         __v.writeTo(_output__)
>>       };
>>     }
>>     def mergeFrom(`_input__`: com.google.protobuf.CodedInputStream): 
>> com.example.protos.demo.Person = {
>>       var __name = this.name
>>       var __age = this.age
>>       var __gender = this.gender
>>       val __tags = (scala.collection.immutable.Vector.newBuilder[String] ++= 
>> this.tags)
>>       val __addresses = 
>> (scala.collection.immutable.Vector.newBuilder[com.example.protos.demo.Address]
>>  ++= this.addresses)
>>       var _done__ = false
>>       while (!_done__) {
>>         val _tag__ = _input__.readTag()
>>         _tag__ match {
>>           case 0 => _done__ = true
>>           case 10 =>
>>             __name = Some(_input__.readString())
>>           case 16 =>
>>             __age = Some(_input__.readInt32())
>>           case 24 =>
>>             __gender = 
>> Some(com.example.protos.demo.Gender.fromValue(_input__.readEnum()))
>>           case 34 =>
>>             __tags += _input__.readString()
>>           case 42 =>
>>             __addresses += 
>> com.trueaccord.scalapb.LiteParser.readMessage(_input__, 
>> com.example.protos.demo.Address.defaultInstance)
>>           case tag => _input__.skipField(tag)
>>         }
>>       }
>>       com.example.protos.demo.Person(
>>           name = __name,
>>           age = __age,
>>           gender = __gender,
>>           tags = __tags.result(),
>>           addresses = __addresses.result()
>>       )
>>     }
>>     def getName: String = name.getOrElse("")
>>     def clearName: Person = copy(name = None)
>>     def withName(__v: String): Person = copy(name = Some(__v))
>>     def getAge: Int = age.getOrElse(0)
>>     def clearAge: Person = copy(age = None)
>>     def withAge(__v: Int): Person = copy(age = Some(__v))
>>     def getGender: com.example.protos.demo.Gender = 
>> gender.getOrElse(com.example.protos.demo.Gender.MALE)
>>     def clearGender: Person = copy(gender = None)
>>     def withGender(__v: com.example.protos.demo.Gender): Person = 
>> copy(gender = Some(__v))
>>     def clearTags = copy(tags = scala.collection.Seq.empty)
>>     def addTags(__vs: String*): Person = addAllTags(__vs)
>>     def addAllTags(__vs: TraversableOnce[String]): Person = copy(tags = tags 
>> ++ __vs)
>>     def withTags(__v: scala.collection.Seq[String]): Person = copy(tags = 
>> __v)
>>     def clearAddresses = copy(addresses = scala.collection.Seq.empty)
>>     def addAddresses(__vs: com.example.protos.demo.Address*): Person = 
>> addAllAddresses(__vs)
>>     def addAllAddresses(__vs: 
>> TraversableOnce[com.example.protos.demo.Address]): Person = copy(addresses = 
>> addresses ++ __vs)
>>     def withAddresses(__v: 
>> scala.collection.Seq[com.example.protos.demo.Address]): Person = 
>> copy(addresses = __v)
>>     def getField(__field: com.google.protobuf.Descriptors.FieldDescriptor): 
>> scala.Any = {
>>       __field.getNumber match {
>>         case 1 => name.getOrElse(null)
>>         case 2 => age.getOrElse(null)
>>         case 3 => gender.map(_.valueDescriptor).getOrElse(null)
>>         case 4 => tags
>>         case 5 => addresses
>>       }
>>     }
>>     override def toString: String = 
>> com.trueaccord.scalapb.TextFormat.printToUnicodeString(this)
>>     def companion = com.example.protos.demo.Person
>> }
>>
>> object Person extends 
>> com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person]
>>  {
>>   implicit def messageCompanion: 
>> com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person]
>>  = this
>>   def fromFieldsMap(__fieldsMap: 
>> scala.collection.immutable.Map[com.google.protobuf.Descriptors.FieldDescriptor,
>>  scala.Any]): com.example.protos.demo.Person = {
>>     require(__fieldsMap.keys.forall(_.getContainingType() == descriptor), 
>> "FieldDescriptor does not match message type.")
>>     val __fields = descriptor.getFields
>>     com.example.protos.demo.Person(
>>       __fieldsMap.get(__fields.get(0)).asInstanceOf[scala.Option[String]],
>>       __fieldsMap.get(__fields.get(1)).asInstanceOf[scala.Option[Int]],
>>       
>> __fieldsMap.get(__fields.get(2)).asInstanceOf[scala.Option[com.google.protobuf.Descriptors.EnumValueDescriptor]].map(__e
>>  => com.example.protos.demo.Gender.fromValue(__e.getNumber)),
>>       __fieldsMap.getOrElse(__fields.get(3), 
>> Nil).asInstanceOf[scala.collection.Seq[String]],
>>       __fieldsMap.getOrElse(__fields.get(4), 
>> Nil).asInstanceOf[scala.collection.Seq[com.example.protos.demo.Address]]
>>     )
>>   }
>>   def descriptor: com.google.protobuf.Descriptors.Descriptor = 
>> DemoProto.descriptor.getMessageTypes.get(1)
>>   def messageCompanionForField(__field: 
>> com.google.protobuf.Descriptors.FieldDescriptor): 
>> com.trueaccord.scalapb.GeneratedMessageCompanion[_] = {
>>     require(__field.getContainingType() == descriptor, "FieldDescriptor does 
>> not match message type.")
>>     var __out: com.trueaccord.scalapb.GeneratedMessageCompanion[_] = null
>>     __field.getNumber match {
>>       case 5 => __out = com.example.protos.demo.Address
>>     }
>>   __out
>>   }
>>   def enumCompanionForField(__field: 
>> com.google.protobuf.Descriptors.FieldDescriptor): 
>> com.trueaccord.scalapb.GeneratedEnumCompanion[_] = {
>>     require(__field.getContainingType() == descriptor, "FieldDescriptor does 
>> not match message type.")
>>     __field.getNumber match {
>>       case 3 => com.example.protos.demo.Gender
>>     }
>>   }
>>   lazy val defaultInstance = com.example.protos.demo.Person(
>>   )
>>   implicit class PersonLens[UpperPB](_l: com.trueaccord.lenses.Lens[UpperPB, 
>> com.example.protos.demo.Person]) extends 
>> com.trueaccord.lenses.ObjectLens[UpperPB, 
>> com.example.protos.demo.Person](_l) {
>>     def name: com.trueaccord.lenses.Lens[UpperPB, String] = 
>> field(_.getName)((c_, f_) => c_.copy(name = Some(f_)))
>>     def optionalName: com.trueaccord.lenses.Lens[UpperPB, 
>> scala.Option[String]] = field(_.name)((c_, f_) => c_.copy(name = f_))
>>     def age: com.trueaccord.lenses.Lens[UpperPB, Int] = field(_.getAge)((c_, 
>> f_) => c_.copy(age = Some(f_)))
>>     def optionalAge: com.trueaccord.lenses.Lens[UpperPB, scala.Option[Int]] 
>> = field(_.age)((c_, f_) => c_.copy(age = f_))
>>     def gender: com.trueaccord.lenses.Lens[UpperPB, 
>> com.example.protos.demo.Gender] = field(_.getGender)((c_, f_) => 
>> c_.copy(gender = Some(f_)))
>>     def optionalGender: com.trueaccord.lenses.Lens[UpperPB, 
>> scala.Option[com.example.protos.demo.Gender]] = field(_.gender)((c_, f_) => 
>> c_.copy(gender = f_))
>>     def tags: com.trueaccord.lenses.Lens[UpperPB, 
>> scala.collection.Seq[String]] = field(_.tags)((c_, f_) => c_.copy(tags = f_))
>>     def addresses: com.trueaccord.lenses.Lens[UpperPB, 
>> scala.collection.Seq[com.example.protos.demo.Address]] = 
>> field(_.addresses)((c_, f_) => c_.copy(addresses = f_))
>>   }
>>   final val NAME_FIELD_NUMBER = 1
>>   final val AGE_FIELD_NUMBER = 2
>>   final val GENDER_FIELD_NUMBER = 3
>>   final val TAGS_FIELD_NUMBER = 4
>>   final val ADDRESSES_FIELD_NUMBER = 5
>> }
>>
>>
>> On Wed, Nov 16, 2016 at 1:28 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Could you provide the Person class?
>>>
>>> On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>> I am using 2.11.8. Thanks
>>>>
>>>> On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <
>>>> shixi...@databricks.com> wrote:
>>>>
>>>>> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has
>>>>> some known race conditions in reflection and the Scala community doesn't
>>>>> have plan to fix it (http://docs.scala-lang.org/ov
>>>>> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it
>>>>> is upgrading to Scala 2.11.
>>>>>
>>>>> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
>>>>> deshpandesh...@gmail.com> wrote:
>>>>>
>>>>>> I am using protobuf to encode. This may not be related to the new
>>>>>> release issue....
>>>>>>
>>>>>> Exception in thread "main" scala.ScalaReflectionException: <none> is
>>>>>> not a term
>>>>>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.sca
>>>>>> la:199)
>>>>>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(S
>>>>>> ymbols.scala:84)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc
>>>>>> tParams(ScalaReflection.scala:811)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara
>>>>>> ms(ScalaReflection.scala:39)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst
>>>>>> ructorParameters(ScalaReflection.scala:800)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo
>>>>>> rParameters(ScalaReflection.scala:39)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>>> ion.scala:582)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>>> ion.scala:460)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>>>>> ly(ScalaReflection.scala:592)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>>>>> ly(ScalaReflection.scala:583)
>>>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>>>>> aversableLike.scala:252)
>>>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>>>>> aversableLike.scala:252)
>>>>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>>>>> at scala.collection.TraversableLike$class.flatMap(TraversableLi
>>>>>> ke.scala:252)
>>>>>> at scala.collection.immutable.List.flatMap(List.scala:344)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>>> ion.scala:583)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor
>>>>>> (ScalaReflection.scala:425)
>>>>>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap
>>>>>> ply(ExpressionEncoder.scala:61)
>>>>>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
>>>>>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli
>>>>>> cits.scala:47)
>>>>>> at PersonConsumer$.main(PersonConsumer.scala:33)
>>>>>> at PersonConsumer.main(PersonConsumer.scala)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>>>> ssorImpl.java:62)
>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>>> thodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.j
>>>>>> ava:147)
>>>>>>
>>>>>> The following is my code ...
>>>>>>
>>>>>> object PersonConsumer {
>>>>>>   import org.apache.spark.rdd.RDD
>>>>>>   import com.trueaccord.scalapb.spark._
>>>>>>   import org.apache.spark.sql.{SQLContext, SparkSession}
>>>>>>   import com.example.protos.demo._
>>>>>>
>>>>>>   def main(args : Array[String]) {
>>>>>>
>>>>>>     def parseLine(s: String): Person =
>>>>>>       Person.parseFrom(
>>>>>>         org.apache.commons.codec.binary.Base64.decodeBase64(s))
>>>>>>
>>>>>>     val spark = SparkSession.builder.
>>>>>>       master("local")
>>>>>>       .appName("spark session example")
>>>>>>       .getOrCreate()
>>>>>>
>>>>>>     import spark.implicits._
>>>>>>
>>>>>>     val ds1 = 
>>>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>>>>>>
>>>>>>     val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>>>>>>
>>>>>>     val ds3 = ds2.map(str => 
>>>>>> parseLine(str)).createOrReplaceTempView("persons")
>>>>>>
>>>>>>     val ds4 = spark.sqlContext.sql("select name from persons")
>>>>>>
>>>>>>     val query = ds4.writeStream
>>>>>>       .outputMode("append")
>>>>>>       .format("console")
>>>>>>       .start()
>>>>>>     query.awaitTermination()
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to