Hi Michael, Thanks much for the suggestion.
I was wondering - whats the best way to deserialize the 'value' field On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <mich...@databricks.com> wrote: > Encoders can only map data into an object if those columns already exist. > When we are reading from Kafka, we just get a binary blob and you'll need > to help Spark parse that first. Assuming your data is stored in JSON it > should be pretty straight forward. > > streams = spark > .readStream() > .format("kafka") > .option("kafka.bootstrap.servers", bootstrapServers) > .option(subscribeType, topics) > .load() > .withColumn("message", from_json(col("value").cast("string"), > tweetSchema)) // cast the binary value to a string and parse it as json > .select("message.*") // unnest the json > .as(Encoders.bean(Tweet.class)) // only required if you want to use > lambda functions on the data using this class > > Here is some more info on working with JSON and other semi-structured > formats > <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html> > . > > On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com> > wrote: > >> Hi, >> >> Currently , encountering the following exception while working with >> below-mentioned code snippet : >> >> > Please suggest the correct approach for reading the stream into a sql >> > schema. >> > If I add 'tweetSchema' while reading stream, it errors out with message >> - >> > we can not change static schema for kafka. >> >> ------------------------------------------------------------ >> ------------------------------- >> >> *exception* >> >> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve >> '`location`' given input columns: [topic, timestamp, key, offset, value, >> timestampType, partition]*; >> at >> org.apache.spark.sql.catalyst.analysis.package$AnalysisError >> At.failAnalysis(package.scala:42) >> at >> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu >> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77) >> ------------------------------------------------------------ >> -------------------------------------------- >> >> *structured streaming code snippet* >> >> String bootstrapServers = "localhost:9092"; >> String subscribeType = "subscribe"; >> String topics = "events"; >> >> StructType tweetSchema = new StructType() >> .add("tweetId", "string") >> .add("tweetText", "string") >> .add("location", "string") >> .add("timestamp", "string"); >> >> SparkSession spark = SparkSession >> .builder() >> .appName("StreamProcessor") >> .config("spark.master", "local") >> .getOrCreate(); >> >> Dataset<Tweet> streams = spark >> .readStream() >> .format("kafka") >> .option("kafka.bootstrap.servers", >> bootstrapServers) >> .option(subscribeType, topics) >> .load() >> .as(Encoders.bean(Tweet.class)); >> >> streams.createOrReplaceTempView("streamsData"); >> >> String sql = "SELECT location, COUNT(*) as count FROM >> streamsData >> GROUP BY location"; >> Dataset<Row> countsByLocation = spark.sql(sql); >> >> StreamingQuery query = countsByLocation.writeStream() >> .outputMode("complete") >> .format("console") >> .start(); >> >> query.awaitTermination(); >> ------------------------------------------------------------ >> -------------------------------------- >> >> *Tweet * >> >> Tweet.java - has public constructor and getter / setter methods >> >> public class Tweet implements Serializable{ >> >> private String tweetId; >> private String tweetText; >> private String location; >> private String timestamp; >> >> public Tweet(){ >> >> } >> ............. >> >> ------------------------------------------------------------ >> ---------------------------- >> >> *pom.xml * >> >> >> <dependency> >> <groupId>org.apache.spark</groupId> >> <artifactId>spark-core_2.10</artifactId> >> <version>2.1.0</version> >> </dependency> >> <dependency> >> <groupId>org.apache.spark</groupId> >> <artifactId>spark-streaming_2.10</artifactId> >> <version>2.1.0</version> >> </dependency> >> <dependency> >> <groupId>org.apache.spark</groupId> >> <artifactId>spark-streaming-ka >> fka-0-8_2.10</artifactId> >> <version>2.1.0</version> >> </dependency> >> <dependency> >> <groupId>org.apache.spark</groupId> >> <artifactId>spark-sql_2.10</artifactId> >> <version>2.1.0</version> >> </dependency> >> <dependency> >> <groupId>org.apache.spark</groupId> >> <artifactId>spark-sql-kafka-0-10_2.10</artifactId> >> <version>2.1.0</version> >> </dependency> >> ------------------------------------------------------------ >> ------------------------ >> >> >> >> -- >> View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >