Hello everyone,
 The following code works ...

def main(args : Array[String]) {

  val spark = SparkSession.builder.
    master("local")
    .appName("spark session example")
    .getOrCreate()

  import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers","localhost:9092").
    option("subscribe","student").load()

  val ds2 = ds1.map(row=>
row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))

  val query = ds2.writeStream
    .outputMode("append")
    .format("console")
    .start()

  query.awaitTermination()

}


On Thu, Nov 17, 2016 at 11:30 AM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> val spark = SparkSession.builder.
>   master("local")
>   .appName("spark session example")
>   .getOrCreate()
>
> import spark.implicits._
>
> val dframe1 = spark.readStream.format("kafka").
>   option("kafka.bootstrap.servers","localhost:9092").
>   option("subscribe","student").load()
>
> *How do I deserialize the value column from dataframe1 *
>
> *which is Array[Byte] to Student object using Student.parseFrom..???*
>
> *Please help.*
>
> *Thanks.*
>
>
>
> // Stream of votes from Kafka as bytesval votesAsBytes = 
> KafkaUtils.createDirectStream[String, Array[Byte]](
>   ssc, LocationStrategies.PreferConsistent,
>   ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"), 
> kafkaParams))
> // Parse them into Vote case class.val votes: DStream[Vote] = 
> votesAsBytes.map {
>   (cr: ConsumerRecord[String, Array[Byte]]) =>
>     Vote.parseFrom(cr.value())}
>
>

Reply via email to