Sorry, I don't think that I understand the question. Value is just a binary blob that we get from kafka and pass to you. If its stored in JSON, I think the code I provided is a good option, but if you are using a different encoding you may need to write a UDF.
On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <kaniska.man...@gmail.com> wrote: > Hi Michael, > > Thanks much for the suggestion. > > I was wondering - whats the best way to deserialize the 'value' field > > > On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <mich...@databricks.com > > wrote: > >> Encoders can only map data into an object if those columns already >> exist. When we are reading from Kafka, we just get a binary blob and >> you'll need to help Spark parse that first. Assuming your data is stored >> in JSON it should be pretty straight forward. >> >> streams = spark >> .readStream() >> .format("kafka") >> .option("kafka.bootstrap.servers", bootstrapServers) >> .option(subscribeType, topics) >> .load() >> .withColumn("message", from_json(col("value").cast("string"), >> tweetSchema)) // cast the binary value to a string and parse it as json >> .select("message.*") // unnest the json >> .as(Encoders.bean(Tweet.class)) // only required if you want to use >> lambda functions on the data using this class >> >> Here is some more info on working with JSON and other semi-structured >> formats >> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html> >> . >> >> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com> >> wrote: >> >>> Hi, >>> >>> Currently , encountering the following exception while working with >>> below-mentioned code snippet : >>> >>> > Please suggest the correct approach for reading the stream into a sql >>> > schema. >>> > If I add 'tweetSchema' while reading stream, it errors out with >>> message - >>> > we can not change static schema for kafka. >>> >>> ------------------------------------------------------------ >>> ------------------------------- >>> >>> *exception* >>> >>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve >>> '`location`' given input columns: [topic, timestamp, key, offset, value, >>> timestampType, partition]*; >>> at >>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError >>> At.failAnalysis(package.scala:42) >>> at >>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu >>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77) >>> ------------------------------------------------------------ >>> -------------------------------------------- >>> >>> *structured streaming code snippet* >>> >>> String bootstrapServers = "localhost:9092"; >>> String subscribeType = "subscribe"; >>> String topics = "events"; >>> >>> StructType tweetSchema = new StructType() >>> .add("tweetId", "string") >>> .add("tweetText", "string") >>> .add("location", "string") >>> .add("timestamp", "string"); >>> >>> SparkSession spark = SparkSession >>> .builder() >>> .appName("StreamProcessor") >>> .config("spark.master", "local") >>> .getOrCreate(); >>> >>> Dataset<Tweet> streams = spark >>> .readStream() >>> .format("kafka") >>> .option("kafka.bootstrap.servers", >>> bootstrapServers) >>> .option(subscribeType, topics) >>> .load() >>> .as(Encoders.bean(Tweet.class)); >>> >>> streams.createOrReplaceTempView("streamsData"); >>> >>> String sql = "SELECT location, COUNT(*) as count >>> FROM streamsData >>> GROUP BY location"; >>> Dataset<Row> countsByLocation = spark.sql(sql); >>> >>> StreamingQuery query = countsByLocation.writeStream() >>> .outputMode("complete") >>> .format("console") >>> .start(); >>> >>> query.awaitTermination(); >>> ------------------------------------------------------------ >>> -------------------------------------- >>> >>> *Tweet * >>> >>> Tweet.java - has public constructor and getter / setter methods >>> >>> public class Tweet implements Serializable{ >>> >>> private String tweetId; >>> private String tweetText; >>> private String location; >>> private String timestamp; >>> >>> public Tweet(){ >>> >>> } >>> ............. >>> >>> ------------------------------------------------------------ >>> ---------------------------- >>> >>> *pom.xml * >>> >>> >>> <dependency> >>> <groupId>org.apache.spark</groupId> >>> <artifactId>spark-core_2.10</artifactId> >>> <version>2.1.0</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.spark</groupId> >>> <artifactId>spark-streaming_2.10</artifactId> >>> <version>2.1.0</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.spark</groupId> >>> <artifactId>spark-streaming-ka >>> fka-0-8_2.10</artifactId> >>> <version>2.1.0</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.spark</groupId> >>> <artifactId>spark-sql_2.10</artifactId> >>> <version>2.1.0</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.spark</groupId> >>> <artifactId>spark-sql-kafka-0-10_2.10</artifactId> >>> <version>2.1.0</version> >>> </dependency> >>> ------------------------------------------------------------ >>> ------------------------ >>> >>> >>> >>> -- >>> View this message in context: http://apache-spark-user-list. >>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>> >> >