Hello everyone, The following code works ... def main(args : Array[String]) {
val spark = SparkSession.builder. master("local") .appName("spark session example") .getOrCreate() import spark.implicits._ val ds1 = spark.readStream.format("kafka"). option("kafka.bootstrap.servers","localhost:9092"). option("subscribe","student").load() val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_)) val query = ds2.writeStream .outputMode("append") .format("console") .start() query.awaitTermination() } On Thu, Nov 17, 2016 at 11:30 AM, shyla deshpande <deshpandesh...@gmail.com> wrote: > val spark = SparkSession.builder. > master("local") > .appName("spark session example") > .getOrCreate() > > import spark.implicits._ > > val dframe1 = spark.readStream.format("kafka"). > option("kafka.bootstrap.servers","localhost:9092"). > option("subscribe","student").load() > > *How do I deserialize the value column from dataframe1 * > > *which is Array[Byte] to Student object using Student.parseFrom..???* > > *Please help.* > > *Thanks.* > > > > // Stream of votes from Kafka as bytesval votesAsBytes = > KafkaUtils.createDirectStream[String, Array[Byte]]( > ssc, LocationStrategies.PreferConsistent, > ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"), > kafkaParams)) > // Parse them into Vote case class.val votes: DStream[Vote] = > votesAsBytes.map { > (cr: ConsumerRecord[String, Array[Byte]]) => > Vote.parseFrom(cr.value())} > >