Thanks Zhu, That was it. Now works great! On Thu, Nov 17, 2016 at 1:07 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com > wrote:
> The problem is "optional Gender gender = 3;". The generated class "Gender" > is a trait, and Spark cannot know how to create a trait so it's not > supported. You can define your class which is supported by SQL Encoder, and > convert this generated class to the new class in `parseLine`. > > On Wed, Nov 16, 2016 at 4:22 PM, shyla deshpande <deshpandesh...@gmail.com > > wrote: > >> Ryan, >> >> I just wanted to provide more info. Here is my .proto file which is the >> basis for generating the Person class. Thanks. >> >> option java_package = "com.example.protos"; >> enum Gender { >> MALE = 1; >> FEMALE = 2; >> } >> message Address { >> optional string street = 1; >> optional string city = 2; >> } >> message Person { >> optional string name = 1; >> optional int32 age = 2; >> optional Gender gender = 3; >> repeated string tags = 4; >> repeated Address addresses = 5; >> } >> >> >> On Wed, Nov 16, 2016 at 3:04 PM, shyla deshpande < >> deshpandesh...@gmail.com> wrote: >> >>> *Thanks for the response. Following is the Person class..* >>> >>> // Generated by the Scala Plugin for the Protocol Buffer Compiler. >>> // Do not edit! >>> // >>> // Protofile syntax: PROTO2 >>> >>> package com.example.protos.demo >>> >>> >>> >>> @SerialVersionUID(0L) >>> final case class Person( >>> name: scala.Option[String] = None, >>> age: scala.Option[Int] = None, >>> gender: scala.Option[com.example.protos.demo.Gender] = None, >>> tags: scala.collection.Seq[String] = Nil, >>> addresses: scala.collection.Seq[com.example.protos.demo.Address] = Nil >>> ) extends com.trueaccord.scalapb.GeneratedMessage with >>> com.trueaccord.scalapb.Message[Person] with >>> com.trueaccord.lenses.Updatable[Person] { >>> @transient >>> private[this] var __serializedSizeCachedValue: Int = 0 >>> private[this] def __computeSerializedValue(): Int = { >>> var __size = 0 >>> if (name.isDefined) { __size += >>> com.google.protobuf.CodedOutputStream.computeStringSize(1, name.get) } >>> if (age.isDefined) { __size += >>> com.google.protobuf.CodedOutputStream.computeInt32Size(2, age.get) } >>> if (gender.isDefined) { __size += >>> com.google.protobuf.CodedOutputStream.computeEnumSize(3, gender.get.value) } >>> tags.foreach(tags => __size += >>> com.google.protobuf.CodedOutputStream.computeStringSize(4, tags)) >>> addresses.foreach(addresses => __size += 1 + >>> com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(addresses.serializedSize) >>> + addresses.serializedSize) >>> __size >>> } >>> final override def serializedSize: Int = { >>> var read = __serializedSizeCachedValue >>> if (read == 0) { >>> read = __computeSerializedValue() >>> __serializedSizeCachedValue = read >>> } >>> read >>> } >>> def writeTo(`_output__`: com.google.protobuf.CodedOutputStream): Unit = >>> { >>> name.foreach { __v => >>> _output__.writeString(1, __v) >>> }; >>> age.foreach { __v => >>> _output__.writeInt32(2, __v) >>> }; >>> gender.foreach { __v => >>> _output__.writeEnum(3, __v.value) >>> }; >>> tags.foreach { __v => >>> _output__.writeString(4, __v) >>> }; >>> addresses.foreach { __v => >>> _output__.writeTag(5, 2) >>> _output__.writeUInt32NoTag(__v.serializedSize) >>> __v.writeTo(_output__) >>> }; >>> } >>> def mergeFrom(`_input__`: com.google.protobuf.CodedInputStream): >>> com.example.protos.demo.Person = { >>> var __name = this.name >>> var __age = this.age >>> var __gender = this.gender >>> val __tags = (scala.collection.immutable.Vector.newBuilder[String] >>> ++= this.tags) >>> val __addresses = >>> (scala.collection.immutable.Vector.newBuilder[com.example.protos.demo.Address] >>> ++= this.addresses) >>> var _done__ = false >>> while (!_done__) { >>> val _tag__ = _input__.readTag() >>> _tag__ match { >>> case 0 => _done__ = true >>> case 10 => >>> __name = Some(_input__.readString()) >>> case 16 => >>> __age = Some(_input__.readInt32()) >>> case 24 => >>> __gender = >>> Some(com.example.protos.demo.Gender.fromValue(_input__.readEnum())) >>> case 34 => >>> __tags += _input__.readString() >>> case 42 => >>> __addresses += >>> com.trueaccord.scalapb.LiteParser.readMessage(_input__, >>> com.example.protos.demo.Address.defaultInstance) >>> case tag => _input__.skipField(tag) >>> } >>> } >>> com.example.protos.demo.Person( >>> name = __name, >>> age = __age, >>> gender = __gender, >>> tags = __tags.result(), >>> addresses = __addresses.result() >>> ) >>> } >>> def getName: String = name.getOrElse("") >>> def clearName: Person = copy(name = None) >>> def withName(__v: String): Person = copy(name = Some(__v)) >>> def getAge: Int = age.getOrElse(0) >>> def clearAge: Person = copy(age = None) >>> def withAge(__v: Int): Person = copy(age = Some(__v)) >>> def getGender: com.example.protos.demo.Gender = >>> gender.getOrElse(com.example.protos.demo.Gender.MALE) >>> def clearGender: Person = copy(gender = None) >>> def withGender(__v: com.example.protos.demo.Gender): Person = >>> copy(gender = Some(__v)) >>> def clearTags = copy(tags = scala.collection.Seq.empty) >>> def addTags(__vs: String*): Person = addAllTags(__vs) >>> def addAllTags(__vs: TraversableOnce[String]): Person = copy(tags = >>> tags ++ __vs) >>> def withTags(__v: scala.collection.Seq[String]): Person = copy(tags = >>> __v) >>> def clearAddresses = copy(addresses = scala.collection.Seq.empty) >>> def addAddresses(__vs: com.example.protos.demo.Address*): Person = >>> addAllAddresses(__vs) >>> def addAllAddresses(__vs: >>> TraversableOnce[com.example.protos.demo.Address]): Person = copy(addresses >>> = addresses ++ __vs) >>> def withAddresses(__v: >>> scala.collection.Seq[com.example.protos.demo.Address]): Person = >>> copy(addresses = __v) >>> def getField(__field: com.google.protobuf.Descriptors.FieldDescriptor): >>> scala.Any = { >>> __field.getNumber match { >>> case 1 => name.getOrElse(null) >>> case 2 => age.getOrElse(null) >>> case 3 => gender.map(_.valueDescriptor).getOrElse(null) >>> case 4 => tags >>> case 5 => addresses >>> } >>> } >>> override def toString: String = >>> com.trueaccord.scalapb.TextFormat.printToUnicodeString(this) >>> def companion = com.example.protos.demo.Person >>> } >>> >>> object Person extends >>> com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person] >>> { >>> implicit def messageCompanion: >>> com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person] >>> = this >>> def fromFieldsMap(__fieldsMap: >>> scala.collection.immutable.Map[com.google.protobuf.Descriptors.FieldDescriptor, >>> scala.Any]): com.example.protos.demo.Person = { >>> require(__fieldsMap.keys.forall(_.getContainingType() == descriptor), >>> "FieldDescriptor does not match message type.") >>> val __fields = descriptor.getFields >>> com.example.protos.demo.Person( >>> __fieldsMap.get(__fields.get(0)).asInstanceOf[scala.Option[String]], >>> __fieldsMap.get(__fields.get(1)).asInstanceOf[scala.Option[Int]], >>> >>> __fieldsMap.get(__fields.get(2)).asInstanceOf[scala.Option[com.google.protobuf.Descriptors.EnumValueDescriptor]].map(__e >>> => com.example.protos.demo.Gender.fromValue(__e.getNumber)), >>> __fieldsMap.getOrElse(__fields.get(3), >>> Nil).asInstanceOf[scala.collection.Seq[String]], >>> __fieldsMap.getOrElse(__fields.get(4), >>> Nil).asInstanceOf[scala.collection.Seq[com.example.protos.demo.Address]] >>> ) >>> } >>> def descriptor: com.google.protobuf.Descriptors.Descriptor = >>> DemoProto.descriptor.getMessageTypes.get(1) >>> def messageCompanionForField(__field: >>> com.google.protobuf.Descriptors.FieldDescriptor): >>> com.trueaccord.scalapb.GeneratedMessageCompanion[_] = { >>> require(__field.getContainingType() == descriptor, "FieldDescriptor >>> does not match message type.") >>> var __out: com.trueaccord.scalapb.GeneratedMessageCompanion[_] = null >>> __field.getNumber match { >>> case 5 => __out = com.example.protos.demo.Address >>> } >>> __out >>> } >>> def enumCompanionForField(__field: >>> com.google.protobuf.Descriptors.FieldDescriptor): >>> com.trueaccord.scalapb.GeneratedEnumCompanion[_] = { >>> require(__field.getContainingType() == descriptor, "FieldDescriptor >>> does not match message type.") >>> __field.getNumber match { >>> case 3 => com.example.protos.demo.Gender >>> } >>> } >>> lazy val defaultInstance = com.example.protos.demo.Person( >>> ) >>> implicit class PersonLens[UpperPB](_l: >>> com.trueaccord.lenses.Lens[UpperPB, com.example.protos.demo.Person]) >>> extends com.trueaccord.lenses.ObjectLens[UpperPB, >>> com.example.protos.demo.Person](_l) { >>> def name: com.trueaccord.lenses.Lens[UpperPB, String] = >>> field(_.getName)((c_, f_) => c_.copy(name = Some(f_))) >>> def optionalName: com.trueaccord.lenses.Lens[UpperPB, >>> scala.Option[String]] = field(_.name)((c_, f_) => c_.copy(name = f_)) >>> def age: com.trueaccord.lenses.Lens[UpperPB, Int] = >>> field(_.getAge)((c_, f_) => c_.copy(age = Some(f_))) >>> def optionalAge: com.trueaccord.lenses.Lens[UpperPB, scala.Option[Int]] >>> = field(_.age)((c_, f_) => c_.copy(age = f_)) >>> def gender: com.trueaccord.lenses.Lens[UpperPB, >>> com.example.protos.demo.Gender] = field(_.getGender)((c_, f_) => >>> c_.copy(gender = Some(f_))) >>> def optionalGender: com.trueaccord.lenses.Lens[UpperPB, >>> scala.Option[com.example.protos.demo.Gender]] = field(_.gender)((c_, f_) => >>> c_.copy(gender = f_)) >>> def tags: com.trueaccord.lenses.Lens[UpperPB, >>> scala.collection.Seq[String]] = field(_.tags)((c_, f_) => c_.copy(tags = >>> f_)) >>> def addresses: com.trueaccord.lenses.Lens[UpperPB, >>> scala.collection.Seq[com.example.protos.demo.Address]] = >>> field(_.addresses)((c_, f_) => c_.copy(addresses = f_)) >>> } >>> final val NAME_FIELD_NUMBER = 1 >>> final val AGE_FIELD_NUMBER = 2 >>> final val GENDER_FIELD_NUMBER = 3 >>> final val TAGS_FIELD_NUMBER = 4 >>> final val ADDRESSES_FIELD_NUMBER = 5 >>> } >>> >>> >>> On Wed, Nov 16, 2016 at 1:28 PM, Shixiong(Ryan) Zhu < >>> shixi...@databricks.com> wrote: >>> >>>> Could you provide the Person class? >>>> >>>> On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande < >>>> deshpandesh...@gmail.com> wrote: >>>> >>>>> I am using 2.11.8. Thanks >>>>> >>>>> On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu < >>>>> shixi...@databricks.com> wrote: >>>>> >>>>>> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has >>>>>> some known race conditions in reflection and the Scala community doesn't >>>>>> have plan to fix it (http://docs.scala-lang.org/ov >>>>>> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it >>>>>> is upgrading to Scala 2.11. >>>>>> >>>>>> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande < >>>>>> deshpandesh...@gmail.com> wrote: >>>>>> >>>>>>> I am using protobuf to encode. This may not be related to the new >>>>>>> release issue.... >>>>>>> >>>>>>> Exception in thread "main" scala.ScalaReflectionException: <none> >>>>>>> is not a term >>>>>>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.sca >>>>>>> la:199) >>>>>>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(S >>>>>>> ymbols.scala:84) >>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc >>>>>>> tParams(ScalaReflection.scala:811) >>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara >>>>>>> ms(ScalaReflection.scala:39) >>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst >>>>>>> ructorParameters(ScalaReflection.scala:800) >>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo >>>>>>> rParameters(ScalaReflection.scala:39) >>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp >>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect >>>>>>> ion.scala:582) >>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp >>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect >>>>>>> ion.scala:460) >>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app >>>>>>> ly(ScalaReflection.scala:592) >>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app >>>>>>> ly(ScalaReflection.scala:583) >>>>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr >>>>>>> aversableLike.scala:252) >>>>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr >>>>>>> aversableLike.scala:252) >>>>>>> at scala.collection.immutable.List.foreach(List.scala:381) >>>>>>> at scala.collection.TraversableLike$class.flatMap(TraversableLi >>>>>>> ke.scala:252) >>>>>>> at scala.collection.immutable.List.flatMap(List.scala:344) >>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp >>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect >>>>>>> ion.scala:583) >>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor >>>>>>> (ScalaReflection.scala:425) >>>>>>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap >>>>>>> ply(ExpressionEncoder.scala:61) >>>>>>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274) >>>>>>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli >>>>>>> cits.scala:47) >>>>>>> at PersonConsumer$.main(PersonConsumer.scala:33) >>>>>>> at PersonConsumer.main(PersonConsumer.scala) >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>>>>>> ssorImpl.java:62) >>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>>>>>> thodAccessorImpl.java:43) >>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.j >>>>>>> ava:147) >>>>>>> >>>>>>> The following is my code ... >>>>>>> >>>>>>> object PersonConsumer { >>>>>>> import org.apache.spark.rdd.RDD >>>>>>> import com.trueaccord.scalapb.spark._ >>>>>>> import org.apache.spark.sql.{SQLContext, SparkSession} >>>>>>> import com.example.protos.demo._ >>>>>>> >>>>>>> def main(args : Array[String]) { >>>>>>> >>>>>>> def parseLine(s: String): Person = >>>>>>> Person.parseFrom( >>>>>>> org.apache.commons.codec.binary.Base64.decodeBase64(s)) >>>>>>> >>>>>>> val spark = SparkSession.builder. >>>>>>> master("local") >>>>>>> .appName("spark session example") >>>>>>> .getOrCreate() >>>>>>> >>>>>>> import spark.implicits._ >>>>>>> >>>>>>> val ds1 = >>>>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load() >>>>>>> >>>>>>> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String] >>>>>>> >>>>>>> val ds3 = ds2.map(str => >>>>>>> parseLine(str)).createOrReplaceTempView("persons") >>>>>>> >>>>>>> val ds4 = spark.sqlContext.sql("select name from persons") >>>>>>> >>>>>>> val query = ds4.writeStream >>>>>>> .outputMode("append") >>>>>>> .format("console") >>>>>>> .start() >>>>>>> query.awaitTermination() >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >