Thanks Zhu, That was it. Now works great!

On Thu, Nov 17, 2016 at 1:07 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> The problem is "optional Gender gender = 3;". The generated class "Gender"
> is a trait, and Spark cannot know how to create a trait so it's not
> supported. You can define your class which is supported by SQL Encoder, and
> convert this generated class to the new class in `parseLine`.
>
> On Wed, Nov 16, 2016 at 4:22 PM, shyla deshpande <deshpandesh...@gmail.com
> > wrote:
>
>> Ryan,
>>
>> I just wanted to provide more info. Here is my .proto file which is the
>> basis for generating the Person class. Thanks.
>>
>> option java_package = "com.example.protos";
>> enum Gender {
>>     MALE = 1;
>>     FEMALE = 2;
>> }
>> message Address {
>>     optional string street = 1;
>>     optional string city = 2;
>> }
>> message Person {
>>     optional string name = 1;
>>     optional int32 age = 2;
>>     optional Gender gender = 3;
>>     repeated string tags = 4;
>>     repeated Address addresses = 5;
>> }
>>
>>
>> On Wed, Nov 16, 2016 at 3:04 PM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> *Thanks for the response. Following is the Person class..*
>>>
>>> // Generated by the Scala Plugin for the Protocol Buffer Compiler.
>>> // Do not edit!
>>> //
>>> // Protofile syntax: PROTO2
>>>
>>> package com.example.protos.demo
>>>
>>>
>>>
>>> @SerialVersionUID(0L)
>>> final case class Person(
>>>     name: scala.Option[String] = None,
>>>     age: scala.Option[Int] = None,
>>>     gender: scala.Option[com.example.protos.demo.Gender] = None,
>>>     tags: scala.collection.Seq[String] = Nil,
>>>     addresses: scala.collection.Seq[com.example.protos.demo.Address] = Nil
>>>     ) extends com.trueaccord.scalapb.GeneratedMessage with 
>>> com.trueaccord.scalapb.Message[Person] with 
>>> com.trueaccord.lenses.Updatable[Person] {
>>>     @transient
>>>     private[this] var __serializedSizeCachedValue: Int = 0
>>>     private[this] def __computeSerializedValue(): Int = {
>>>       var __size = 0
>>>       if (name.isDefined) { __size += 
>>> com.google.protobuf.CodedOutputStream.computeStringSize(1, name.get) }
>>>       if (age.isDefined) { __size += 
>>> com.google.protobuf.CodedOutputStream.computeInt32Size(2, age.get) }
>>>       if (gender.isDefined) { __size += 
>>> com.google.protobuf.CodedOutputStream.computeEnumSize(3, gender.get.value) }
>>>       tags.foreach(tags => __size += 
>>> com.google.protobuf.CodedOutputStream.computeStringSize(4, tags))
>>>       addresses.foreach(addresses => __size += 1 + 
>>> com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(addresses.serializedSize)
>>>  + addresses.serializedSize)
>>>       __size
>>>     }
>>>     final override def serializedSize: Int = {
>>>       var read = __serializedSizeCachedValue
>>>       if (read == 0) {
>>>         read = __computeSerializedValue()
>>>         __serializedSizeCachedValue = read
>>>       }
>>>       read
>>>     }
>>>     def writeTo(`_output__`: com.google.protobuf.CodedOutputStream): Unit = 
>>> {
>>>       name.foreach { __v =>
>>>         _output__.writeString(1, __v)
>>>       };
>>>       age.foreach { __v =>
>>>         _output__.writeInt32(2, __v)
>>>       };
>>>       gender.foreach { __v =>
>>>         _output__.writeEnum(3, __v.value)
>>>       };
>>>       tags.foreach { __v =>
>>>         _output__.writeString(4, __v)
>>>       };
>>>       addresses.foreach { __v =>
>>>         _output__.writeTag(5, 2)
>>>         _output__.writeUInt32NoTag(__v.serializedSize)
>>>         __v.writeTo(_output__)
>>>       };
>>>     }
>>>     def mergeFrom(`_input__`: com.google.protobuf.CodedInputStream): 
>>> com.example.protos.demo.Person = {
>>>       var __name = this.name
>>>       var __age = this.age
>>>       var __gender = this.gender
>>>       val __tags = (scala.collection.immutable.Vector.newBuilder[String] 
>>> ++= this.tags)
>>>       val __addresses = 
>>> (scala.collection.immutable.Vector.newBuilder[com.example.protos.demo.Address]
>>>  ++= this.addresses)
>>>       var _done__ = false
>>>       while (!_done__) {
>>>         val _tag__ = _input__.readTag()
>>>         _tag__ match {
>>>           case 0 => _done__ = true
>>>           case 10 =>
>>>             __name = Some(_input__.readString())
>>>           case 16 =>
>>>             __age = Some(_input__.readInt32())
>>>           case 24 =>
>>>             __gender = 
>>> Some(com.example.protos.demo.Gender.fromValue(_input__.readEnum()))
>>>           case 34 =>
>>>             __tags += _input__.readString()
>>>           case 42 =>
>>>             __addresses += 
>>> com.trueaccord.scalapb.LiteParser.readMessage(_input__, 
>>> com.example.protos.demo.Address.defaultInstance)
>>>           case tag => _input__.skipField(tag)
>>>         }
>>>       }
>>>       com.example.protos.demo.Person(
>>>           name = __name,
>>>           age = __age,
>>>           gender = __gender,
>>>           tags = __tags.result(),
>>>           addresses = __addresses.result()
>>>       )
>>>     }
>>>     def getName: String = name.getOrElse("")
>>>     def clearName: Person = copy(name = None)
>>>     def withName(__v: String): Person = copy(name = Some(__v))
>>>     def getAge: Int = age.getOrElse(0)
>>>     def clearAge: Person = copy(age = None)
>>>     def withAge(__v: Int): Person = copy(age = Some(__v))
>>>     def getGender: com.example.protos.demo.Gender = 
>>> gender.getOrElse(com.example.protos.demo.Gender.MALE)
>>>     def clearGender: Person = copy(gender = None)
>>>     def withGender(__v: com.example.protos.demo.Gender): Person = 
>>> copy(gender = Some(__v))
>>>     def clearTags = copy(tags = scala.collection.Seq.empty)
>>>     def addTags(__vs: String*): Person = addAllTags(__vs)
>>>     def addAllTags(__vs: TraversableOnce[String]): Person = copy(tags = 
>>> tags ++ __vs)
>>>     def withTags(__v: scala.collection.Seq[String]): Person = copy(tags = 
>>> __v)
>>>     def clearAddresses = copy(addresses = scala.collection.Seq.empty)
>>>     def addAddresses(__vs: com.example.protos.demo.Address*): Person = 
>>> addAllAddresses(__vs)
>>>     def addAllAddresses(__vs: 
>>> TraversableOnce[com.example.protos.demo.Address]): Person = copy(addresses 
>>> = addresses ++ __vs)
>>>     def withAddresses(__v: 
>>> scala.collection.Seq[com.example.protos.demo.Address]): Person = 
>>> copy(addresses = __v)
>>>     def getField(__field: com.google.protobuf.Descriptors.FieldDescriptor): 
>>> scala.Any = {
>>>       __field.getNumber match {
>>>         case 1 => name.getOrElse(null)
>>>         case 2 => age.getOrElse(null)
>>>         case 3 => gender.map(_.valueDescriptor).getOrElse(null)
>>>         case 4 => tags
>>>         case 5 => addresses
>>>       }
>>>     }
>>>     override def toString: String = 
>>> com.trueaccord.scalapb.TextFormat.printToUnicodeString(this)
>>>     def companion = com.example.protos.demo.Person
>>> }
>>>
>>> object Person extends 
>>> com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person]
>>>  {
>>>   implicit def messageCompanion: 
>>> com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person]
>>>  = this
>>>   def fromFieldsMap(__fieldsMap: 
>>> scala.collection.immutable.Map[com.google.protobuf.Descriptors.FieldDescriptor,
>>>  scala.Any]): com.example.protos.demo.Person = {
>>>     require(__fieldsMap.keys.forall(_.getContainingType() == descriptor), 
>>> "FieldDescriptor does not match message type.")
>>>     val __fields = descriptor.getFields
>>>     com.example.protos.demo.Person(
>>>       __fieldsMap.get(__fields.get(0)).asInstanceOf[scala.Option[String]],
>>>       __fieldsMap.get(__fields.get(1)).asInstanceOf[scala.Option[Int]],
>>>       
>>> __fieldsMap.get(__fields.get(2)).asInstanceOf[scala.Option[com.google.protobuf.Descriptors.EnumValueDescriptor]].map(__e
>>>  => com.example.protos.demo.Gender.fromValue(__e.getNumber)),
>>>       __fieldsMap.getOrElse(__fields.get(3), 
>>> Nil).asInstanceOf[scala.collection.Seq[String]],
>>>       __fieldsMap.getOrElse(__fields.get(4), 
>>> Nil).asInstanceOf[scala.collection.Seq[com.example.protos.demo.Address]]
>>>     )
>>>   }
>>>   def descriptor: com.google.protobuf.Descriptors.Descriptor = 
>>> DemoProto.descriptor.getMessageTypes.get(1)
>>>   def messageCompanionForField(__field: 
>>> com.google.protobuf.Descriptors.FieldDescriptor): 
>>> com.trueaccord.scalapb.GeneratedMessageCompanion[_] = {
>>>     require(__field.getContainingType() == descriptor, "FieldDescriptor 
>>> does not match message type.")
>>>     var __out: com.trueaccord.scalapb.GeneratedMessageCompanion[_] = null
>>>     __field.getNumber match {
>>>       case 5 => __out = com.example.protos.demo.Address
>>>     }
>>>   __out
>>>   }
>>>   def enumCompanionForField(__field: 
>>> com.google.protobuf.Descriptors.FieldDescriptor): 
>>> com.trueaccord.scalapb.GeneratedEnumCompanion[_] = {
>>>     require(__field.getContainingType() == descriptor, "FieldDescriptor 
>>> does not match message type.")
>>>     __field.getNumber match {
>>>       case 3 => com.example.protos.demo.Gender
>>>     }
>>>   }
>>>   lazy val defaultInstance = com.example.protos.demo.Person(
>>>   )
>>>   implicit class PersonLens[UpperPB](_l: 
>>> com.trueaccord.lenses.Lens[UpperPB, com.example.protos.demo.Person]) 
>>> extends com.trueaccord.lenses.ObjectLens[UpperPB, 
>>> com.example.protos.demo.Person](_l) {
>>>     def name: com.trueaccord.lenses.Lens[UpperPB, String] = 
>>> field(_.getName)((c_, f_) => c_.copy(name = Some(f_)))
>>>     def optionalName: com.trueaccord.lenses.Lens[UpperPB, 
>>> scala.Option[String]] = field(_.name)((c_, f_) => c_.copy(name = f_))
>>>     def age: com.trueaccord.lenses.Lens[UpperPB, Int] = 
>>> field(_.getAge)((c_, f_) => c_.copy(age = Some(f_)))
>>>     def optionalAge: com.trueaccord.lenses.Lens[UpperPB, scala.Option[Int]] 
>>> = field(_.age)((c_, f_) => c_.copy(age = f_))
>>>     def gender: com.trueaccord.lenses.Lens[UpperPB, 
>>> com.example.protos.demo.Gender] = field(_.getGender)((c_, f_) => 
>>> c_.copy(gender = Some(f_)))
>>>     def optionalGender: com.trueaccord.lenses.Lens[UpperPB, 
>>> scala.Option[com.example.protos.demo.Gender]] = field(_.gender)((c_, f_) => 
>>> c_.copy(gender = f_))
>>>     def tags: com.trueaccord.lenses.Lens[UpperPB, 
>>> scala.collection.Seq[String]] = field(_.tags)((c_, f_) => c_.copy(tags = 
>>> f_))
>>>     def addresses: com.trueaccord.lenses.Lens[UpperPB, 
>>> scala.collection.Seq[com.example.protos.demo.Address]] = 
>>> field(_.addresses)((c_, f_) => c_.copy(addresses = f_))
>>>   }
>>>   final val NAME_FIELD_NUMBER = 1
>>>   final val AGE_FIELD_NUMBER = 2
>>>   final val GENDER_FIELD_NUMBER = 3
>>>   final val TAGS_FIELD_NUMBER = 4
>>>   final val ADDRESSES_FIELD_NUMBER = 5
>>> }
>>>
>>>
>>> On Wed, Nov 16, 2016 at 1:28 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
>>>> Could you provide the Person class?
>>>>
>>>> On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande <
>>>> deshpandesh...@gmail.com> wrote:
>>>>
>>>>> I am using 2.11.8. Thanks
>>>>>
>>>>> On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <
>>>>> shixi...@databricks.com> wrote:
>>>>>
>>>>>> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has
>>>>>> some known race conditions in reflection and the Scala community doesn't
>>>>>> have plan to fix it (http://docs.scala-lang.org/ov
>>>>>> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it
>>>>>> is upgrading to Scala 2.11.
>>>>>>
>>>>>> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>
>>>>>>> I am using protobuf to encode. This may not be related to the new
>>>>>>> release issue....
>>>>>>>
>>>>>>> Exception in thread "main" scala.ScalaReflectionException: <none>
>>>>>>> is not a term
>>>>>>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.sca
>>>>>>> la:199)
>>>>>>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(S
>>>>>>> ymbols.scala:84)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc
>>>>>>> tParams(ScalaReflection.scala:811)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara
>>>>>>> ms(ScalaReflection.scala:39)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst
>>>>>>> ructorParameters(ScalaReflection.scala:800)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo
>>>>>>> rParameters(ScalaReflection.scala:39)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>>>> ion.scala:582)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>>>> ion.scala:460)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>>>>>> ly(ScalaReflection.scala:592)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>>>>>> ly(ScalaReflection.scala:583)
>>>>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>>>>>> aversableLike.scala:252)
>>>>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>>>>>> aversableLike.scala:252)
>>>>>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>>>>>> at scala.collection.TraversableLike$class.flatMap(TraversableLi
>>>>>>> ke.scala:252)
>>>>>>> at scala.collection.immutable.List.flatMap(List.scala:344)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>>>> ion.scala:583)
>>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor
>>>>>>> (ScalaReflection.scala:425)
>>>>>>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap
>>>>>>> ply(ExpressionEncoder.scala:61)
>>>>>>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
>>>>>>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli
>>>>>>> cits.scala:47)
>>>>>>> at PersonConsumer$.main(PersonConsumer.scala:33)
>>>>>>> at PersonConsumer.main(PersonConsumer.scala)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>>>>> ssorImpl.java:62)
>>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>>>> thodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.j
>>>>>>> ava:147)
>>>>>>>
>>>>>>> The following is my code ...
>>>>>>>
>>>>>>> object PersonConsumer {
>>>>>>>   import org.apache.spark.rdd.RDD
>>>>>>>   import com.trueaccord.scalapb.spark._
>>>>>>>   import org.apache.spark.sql.{SQLContext, SparkSession}
>>>>>>>   import com.example.protos.demo._
>>>>>>>
>>>>>>>   def main(args : Array[String]) {
>>>>>>>
>>>>>>>     def parseLine(s: String): Person =
>>>>>>>       Person.parseFrom(
>>>>>>>         org.apache.commons.codec.binary.Base64.decodeBase64(s))
>>>>>>>
>>>>>>>     val spark = SparkSession.builder.
>>>>>>>       master("local")
>>>>>>>       .appName("spark session example")
>>>>>>>       .getOrCreate()
>>>>>>>
>>>>>>>     import spark.implicits._
>>>>>>>
>>>>>>>     val ds1 = 
>>>>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>>>>>>>
>>>>>>>     val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>>>>>>>
>>>>>>>     val ds3 = ds2.map(str => 
>>>>>>> parseLine(str)).createOrReplaceTempView("persons")
>>>>>>>
>>>>>>>     val ds4 = spark.sqlContext.sql("select name from persons")
>>>>>>>
>>>>>>>     val query = ds4.writeStream
>>>>>>>       .outputMode("append")
>>>>>>>       .format("console")
>>>>>>>       .start()
>>>>>>>     query.awaitTermination()
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to