Hello everyone,
The following code works ...
def main(args : Array[String]) {
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","student").load()
val ds2 = ds1.map(row=>
row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
On Thu, Nov 17, 2016 at 11:30 AM, shyla deshpande <[email protected]>
wrote:
> val spark = SparkSession.builder.
> master("local")
> .appName("spark session example")
> .getOrCreate()
>
> import spark.implicits._
>
> val dframe1 = spark.readStream.format("kafka").
> option("kafka.bootstrap.servers","localhost:9092").
> option("subscribe","student").load()
>
> *How do I deserialize the value column from dataframe1 *
>
> *which is Array[Byte] to Student object using Student.parseFrom..???*
>
> *Please help.*
>
> *Thanks.*
>
>
>
> // Stream of votes from Kafka as bytesval votesAsBytes =
> KafkaUtils.createDirectStream[String, Array[Byte]](
> ssc, LocationStrategies.PreferConsistent,
> ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"),
> kafkaParams))
> // Parse them into Vote case class.val votes: DStream[Vote] =
> votesAsBytes.map {
> (cr: ConsumerRecord[String, Array[Byte]]) =>
> Vote.parseFrom(cr.value())}
>
>