The problem is "optional Gender gender = 3;". The generated class "Gender" is a trait, and Spark cannot know how to create a trait so it's not supported. You can define your class which is supported by SQL Encoder, and convert this generated class to the new class in `parseLine`.
On Wed, Nov 16, 2016 at 4:22 PM, shyla deshpande <deshpandesh...@gmail.com> wrote: > Ryan, > > I just wanted to provide more info. Here is my .proto file which is the > basis for generating the Person class. Thanks. > > option java_package = "com.example.protos"; > enum Gender { > MALE = 1; > FEMALE = 2; > } > message Address { > optional string street = 1; > optional string city = 2; > } > message Person { > optional string name = 1; > optional int32 age = 2; > optional Gender gender = 3; > repeated string tags = 4; > repeated Address addresses = 5; > } > > > On Wed, Nov 16, 2016 at 3:04 PM, shyla deshpande <deshpandesh...@gmail.com > > wrote: > >> *Thanks for the response. Following is the Person class..* >> >> // Generated by the Scala Plugin for the Protocol Buffer Compiler. >> // Do not edit! >> // >> // Protofile syntax: PROTO2 >> >> package com.example.protos.demo >> >> >> >> @SerialVersionUID(0L) >> final case class Person( >> name: scala.Option[String] = None, >> age: scala.Option[Int] = None, >> gender: scala.Option[com.example.protos.demo.Gender] = None, >> tags: scala.collection.Seq[String] = Nil, >> addresses: scala.collection.Seq[com.example.protos.demo.Address] = Nil >> ) extends com.trueaccord.scalapb.GeneratedMessage with >> com.trueaccord.scalapb.Message[Person] with >> com.trueaccord.lenses.Updatable[Person] { >> @transient >> private[this] var __serializedSizeCachedValue: Int = 0 >> private[this] def __computeSerializedValue(): Int = { >> var __size = 0 >> if (name.isDefined) { __size += >> com.google.protobuf.CodedOutputStream.computeStringSize(1, name.get) } >> if (age.isDefined) { __size += >> com.google.protobuf.CodedOutputStream.computeInt32Size(2, age.get) } >> if (gender.isDefined) { __size += >> com.google.protobuf.CodedOutputStream.computeEnumSize(3, gender.get.value) } >> tags.foreach(tags => __size += >> com.google.protobuf.CodedOutputStream.computeStringSize(4, tags)) >> addresses.foreach(addresses => __size += 1 + >> com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(addresses.serializedSize) >> + addresses.serializedSize) >> __size >> } >> final override def serializedSize: Int = { >> var read = __serializedSizeCachedValue >> if (read == 0) { >> read = __computeSerializedValue() >> __serializedSizeCachedValue = read >> } >> read >> } >> def writeTo(`_output__`: com.google.protobuf.CodedOutputStream): Unit = { >> name.foreach { __v => >> _output__.writeString(1, __v) >> }; >> age.foreach { __v => >> _output__.writeInt32(2, __v) >> }; >> gender.foreach { __v => >> _output__.writeEnum(3, __v.value) >> }; >> tags.foreach { __v => >> _output__.writeString(4, __v) >> }; >> addresses.foreach { __v => >> _output__.writeTag(5, 2) >> _output__.writeUInt32NoTag(__v.serializedSize) >> __v.writeTo(_output__) >> }; >> } >> def mergeFrom(`_input__`: com.google.protobuf.CodedInputStream): >> com.example.protos.demo.Person = { >> var __name = this.name >> var __age = this.age >> var __gender = this.gender >> val __tags = (scala.collection.immutable.Vector.newBuilder[String] ++= >> this.tags) >> val __addresses = >> (scala.collection.immutable.Vector.newBuilder[com.example.protos.demo.Address] >> ++= this.addresses) >> var _done__ = false >> while (!_done__) { >> val _tag__ = _input__.readTag() >> _tag__ match { >> case 0 => _done__ = true >> case 10 => >> __name = Some(_input__.readString()) >> case 16 => >> __age = Some(_input__.readInt32()) >> case 24 => >> __gender = >> Some(com.example.protos.demo.Gender.fromValue(_input__.readEnum())) >> case 34 => >> __tags += _input__.readString() >> case 42 => >> __addresses += >> com.trueaccord.scalapb.LiteParser.readMessage(_input__, >> com.example.protos.demo.Address.defaultInstance) >> case tag => _input__.skipField(tag) >> } >> } >> com.example.protos.demo.Person( >> name = __name, >> age = __age, >> gender = __gender, >> tags = __tags.result(), >> addresses = __addresses.result() >> ) >> } >> def getName: String = name.getOrElse("") >> def clearName: Person = copy(name = None) >> def withName(__v: String): Person = copy(name = Some(__v)) >> def getAge: Int = age.getOrElse(0) >> def clearAge: Person = copy(age = None) >> def withAge(__v: Int): Person = copy(age = Some(__v)) >> def getGender: com.example.protos.demo.Gender = >> gender.getOrElse(com.example.protos.demo.Gender.MALE) >> def clearGender: Person = copy(gender = None) >> def withGender(__v: com.example.protos.demo.Gender): Person = >> copy(gender = Some(__v)) >> def clearTags = copy(tags = scala.collection.Seq.empty) >> def addTags(__vs: String*): Person = addAllTags(__vs) >> def addAllTags(__vs: TraversableOnce[String]): Person = copy(tags = tags >> ++ __vs) >> def withTags(__v: scala.collection.Seq[String]): Person = copy(tags = >> __v) >> def clearAddresses = copy(addresses = scala.collection.Seq.empty) >> def addAddresses(__vs: com.example.protos.demo.Address*): Person = >> addAllAddresses(__vs) >> def addAllAddresses(__vs: >> TraversableOnce[com.example.protos.demo.Address]): Person = copy(addresses = >> addresses ++ __vs) >> def withAddresses(__v: >> scala.collection.Seq[com.example.protos.demo.Address]): Person = >> copy(addresses = __v) >> def getField(__field: com.google.protobuf.Descriptors.FieldDescriptor): >> scala.Any = { >> __field.getNumber match { >> case 1 => name.getOrElse(null) >> case 2 => age.getOrElse(null) >> case 3 => gender.map(_.valueDescriptor).getOrElse(null) >> case 4 => tags >> case 5 => addresses >> } >> } >> override def toString: String = >> com.trueaccord.scalapb.TextFormat.printToUnicodeString(this) >> def companion = com.example.protos.demo.Person >> } >> >> object Person extends >> com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person] >> { >> implicit def messageCompanion: >> com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person] >> = this >> def fromFieldsMap(__fieldsMap: >> scala.collection.immutable.Map[com.google.protobuf.Descriptors.FieldDescriptor, >> scala.Any]): com.example.protos.demo.Person = { >> require(__fieldsMap.keys.forall(_.getContainingType() == descriptor), >> "FieldDescriptor does not match message type.") >> val __fields = descriptor.getFields >> com.example.protos.demo.Person( >> __fieldsMap.get(__fields.get(0)).asInstanceOf[scala.Option[String]], >> __fieldsMap.get(__fields.get(1)).asInstanceOf[scala.Option[Int]], >> >> __fieldsMap.get(__fields.get(2)).asInstanceOf[scala.Option[com.google.protobuf.Descriptors.EnumValueDescriptor]].map(__e >> => com.example.protos.demo.Gender.fromValue(__e.getNumber)), >> __fieldsMap.getOrElse(__fields.get(3), >> Nil).asInstanceOf[scala.collection.Seq[String]], >> __fieldsMap.getOrElse(__fields.get(4), >> Nil).asInstanceOf[scala.collection.Seq[com.example.protos.demo.Address]] >> ) >> } >> def descriptor: com.google.protobuf.Descriptors.Descriptor = >> DemoProto.descriptor.getMessageTypes.get(1) >> def messageCompanionForField(__field: >> com.google.protobuf.Descriptors.FieldDescriptor): >> com.trueaccord.scalapb.GeneratedMessageCompanion[_] = { >> require(__field.getContainingType() == descriptor, "FieldDescriptor does >> not match message type.") >> var __out: com.trueaccord.scalapb.GeneratedMessageCompanion[_] = null >> __field.getNumber match { >> case 5 => __out = com.example.protos.demo.Address >> } >> __out >> } >> def enumCompanionForField(__field: >> com.google.protobuf.Descriptors.FieldDescriptor): >> com.trueaccord.scalapb.GeneratedEnumCompanion[_] = { >> require(__field.getContainingType() == descriptor, "FieldDescriptor does >> not match message type.") >> __field.getNumber match { >> case 3 => com.example.protos.demo.Gender >> } >> } >> lazy val defaultInstance = com.example.protos.demo.Person( >> ) >> implicit class PersonLens[UpperPB](_l: com.trueaccord.lenses.Lens[UpperPB, >> com.example.protos.demo.Person]) extends >> com.trueaccord.lenses.ObjectLens[UpperPB, >> com.example.protos.demo.Person](_l) { >> def name: com.trueaccord.lenses.Lens[UpperPB, String] = >> field(_.getName)((c_, f_) => c_.copy(name = Some(f_))) >> def optionalName: com.trueaccord.lenses.Lens[UpperPB, >> scala.Option[String]] = field(_.name)((c_, f_) => c_.copy(name = f_)) >> def age: com.trueaccord.lenses.Lens[UpperPB, Int] = field(_.getAge)((c_, >> f_) => c_.copy(age = Some(f_))) >> def optionalAge: com.trueaccord.lenses.Lens[UpperPB, scala.Option[Int]] >> = field(_.age)((c_, f_) => c_.copy(age = f_)) >> def gender: com.trueaccord.lenses.Lens[UpperPB, >> com.example.protos.demo.Gender] = field(_.getGender)((c_, f_) => >> c_.copy(gender = Some(f_))) >> def optionalGender: com.trueaccord.lenses.Lens[UpperPB, >> scala.Option[com.example.protos.demo.Gender]] = field(_.gender)((c_, f_) => >> c_.copy(gender = f_)) >> def tags: com.trueaccord.lenses.Lens[UpperPB, >> scala.collection.Seq[String]] = field(_.tags)((c_, f_) => c_.copy(tags = f_)) >> def addresses: com.trueaccord.lenses.Lens[UpperPB, >> scala.collection.Seq[com.example.protos.demo.Address]] = >> field(_.addresses)((c_, f_) => c_.copy(addresses = f_)) >> } >> final val NAME_FIELD_NUMBER = 1 >> final val AGE_FIELD_NUMBER = 2 >> final val GENDER_FIELD_NUMBER = 3 >> final val TAGS_FIELD_NUMBER = 4 >> final val ADDRESSES_FIELD_NUMBER = 5 >> } >> >> >> On Wed, Nov 16, 2016 at 1:28 PM, Shixiong(Ryan) Zhu < >> shixi...@databricks.com> wrote: >> >>> Could you provide the Person class? >>> >>> On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande < >>> deshpandesh...@gmail.com> wrote: >>> >>>> I am using 2.11.8. Thanks >>>> >>>> On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu < >>>> shixi...@databricks.com> wrote: >>>> >>>>> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has >>>>> some known race conditions in reflection and the Scala community doesn't >>>>> have plan to fix it (http://docs.scala-lang.org/ov >>>>> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it >>>>> is upgrading to Scala 2.11. >>>>> >>>>> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande < >>>>> deshpandesh...@gmail.com> wrote: >>>>> >>>>>> I am using protobuf to encode. This may not be related to the new >>>>>> release issue.... >>>>>> >>>>>> Exception in thread "main" scala.ScalaReflectionException: <none> is >>>>>> not a term >>>>>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.sca >>>>>> la:199) >>>>>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(S >>>>>> ymbols.scala:84) >>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc >>>>>> tParams(ScalaReflection.scala:811) >>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara >>>>>> ms(ScalaReflection.scala:39) >>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst >>>>>> ructorParameters(ScalaReflection.scala:800) >>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo >>>>>> rParameters(ScalaReflection.scala:39) >>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp >>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect >>>>>> ion.scala:582) >>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp >>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect >>>>>> ion.scala:460) >>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app >>>>>> ly(ScalaReflection.scala:592) >>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app >>>>>> ly(ScalaReflection.scala:583) >>>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr >>>>>> aversableLike.scala:252) >>>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr >>>>>> aversableLike.scala:252) >>>>>> at scala.collection.immutable.List.foreach(List.scala:381) >>>>>> at scala.collection.TraversableLike$class.flatMap(TraversableLi >>>>>> ke.scala:252) >>>>>> at scala.collection.immutable.List.flatMap(List.scala:344) >>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp >>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect >>>>>> ion.scala:583) >>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor >>>>>> (ScalaReflection.scala:425) >>>>>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap >>>>>> ply(ExpressionEncoder.scala:61) >>>>>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274) >>>>>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli >>>>>> cits.scala:47) >>>>>> at PersonConsumer$.main(PersonConsumer.scala:33) >>>>>> at PersonConsumer.main(PersonConsumer.scala) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>>>>> ssorImpl.java:62) >>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>>>>> thodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.j >>>>>> ava:147) >>>>>> >>>>>> The following is my code ... >>>>>> >>>>>> object PersonConsumer { >>>>>> import org.apache.spark.rdd.RDD >>>>>> import com.trueaccord.scalapb.spark._ >>>>>> import org.apache.spark.sql.{SQLContext, SparkSession} >>>>>> import com.example.protos.demo._ >>>>>> >>>>>> def main(args : Array[String]) { >>>>>> >>>>>> def parseLine(s: String): Person = >>>>>> Person.parseFrom( >>>>>> org.apache.commons.codec.binary.Base64.decodeBase64(s)) >>>>>> >>>>>> val spark = SparkSession.builder. >>>>>> master("local") >>>>>> .appName("spark session example") >>>>>> .getOrCreate() >>>>>> >>>>>> import spark.implicits._ >>>>>> >>>>>> val ds1 = >>>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load() >>>>>> >>>>>> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String] >>>>>> >>>>>> val ds3 = ds2.map(str => >>>>>> parseLine(str)).createOrReplaceTempView("persons") >>>>>> >>>>>> val ds4 = spark.sqlContext.sql("select name from persons") >>>>>> >>>>>> val query = ds4.writeStream >>>>>> .outputMode("append") >>>>>> .format("console") >>>>>> .start() >>>>>> query.awaitTermination() >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>> >>>> >>> >> >