Hi Michael,

Can you please check if I am using correct version of spark-streaming
library as specified in my pom (specified in the email) ?

col("value").cast("string") - throwing an error 'cannot find symbol method
I tried $"value" which results into similar compilation error.


> Sorry, I don't think that I understand the question.  Value is just a
> binary blob that we get from kafka and pass to you.  If its stored in JSON,
> I think the code I provided is a good option, but if you are using a
> different encoding you may need to write a UDF.
>> Hi Michael,
>> Thanks much for the suggestion.
>> I was wondering - whats the best way to deserialize the 'value' field
>>> Encoders can only map data into an object if those columns already
>>> exist.  When we are reading from Kafka, we just get a binary blob and
>>> you'll need to help Spark parse that first.  Assuming your data is stored
>>> in JSON it should be pretty straight forward.
>>> streams = spark
>>>   .readStream()
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>>   .option(subscribeType, topics)
>>>   .load()
>>>   .withColumn("message", from_json(col("value").cast("string"),
>>> tweetSchema)) // cast the binary value to a string and parse it as json
>>>   .select("message.*") // unnest the json
>>>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
>>> lambda functions on the data using this class
>>> Here is some more info on working with JSON and other semi-structured
>>> formats
>>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>>> .
>>>> Hi,
>>>> Currently , encountering the following exception while working with
>>>> below-mentioned code snippet :
>>>> > Please suggest the correct approach for reading the stream into a sql
>>>> > schema.
>>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>>> message -
>>>> > we can not change static schema for kafka.
>>>> ------------------------------------------------------------
>>>> -------------------------------
>>>> *exception*
>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>>> '`location`' given input columns: [topic, timestamp, key, offset, value,
>>>> timestampType, partition]*;
>>>>         at
>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>>> At.failAnalysis(package.scala:42)
>>>>         at
>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
>>>> ------------------------------------------------------------
>>>> --------------------------------------------
>>>> *structured streaming code snippet*
>>>> String bootstrapServers = "localhost:9092";
>>>>             String subscribeType = "subscribe";
>>>>             String topics = "events";
>>>>             StructType tweetSchema = new StructType()
>>>>                 .add("tweetId", "string")
>>>>                 .add("tweetText", "string")
>>>>                 .add("location", "string")
>>>>                 .add("timestamp", "string");
>>>>            SparkSession spark = SparkSession
>>>>                               .builder()
>>>>                               .appName("StreamProcessor")
>>>>                               .config("spark.master", "local")
>>>>                               .getOrCreate();
>>>>           Dataset<Tweet> streams = spark
>>>>                                       .readStream()
>>>>                                       .format("kafka")
>>>>                                       .option("kafka.bootstrap.servers",
>>>> bootstrapServers)
>>>>                                       .option(subscribeType, topics)
>>>>                                       .load()
>>>>                                       .as(Encoders.bean(Tweet.class));
>>>>          streams.createOrReplaceTempView("streamsData");
>>>>                    String sql = "SELECT location,  COUNT(*) as count
>>>> FROM streamsData
>>>> GROUP BY location";
>>>>                    Dataset<Row> countsByLocation = spark.sql(sql);
>>>>                     StreamingQuery query =
>>>> countsByLocation.writeStream()
>>>>                       .outputMode("complete")
>>>>                       .format("console")
>>>>                       .start();
>>>>                     query.awaitTermination();
>>>> ------------------------------------------------------------
>>>> --------------------------------------
>>>> *Tweet *
>>>> Tweet.java - has public constructor and getter / setter methods
>>>> public class Tweet implements Serializable{
>>>>         private String tweetId;
>>>>         private String tweetText;
>>>>         private String location;
>>>>         private String timestamp;
>>>>         public Tweet(){
>>>>         }
>>>> .............
>>>> ------------------------------------------------------------
>>>> ----------------------------
>>>> *pom.xml *
>>>>                 <dependency>
>>>>                         <groupId>org.apache.spark</groupId>
>>>>                         <artifactId>spark-core_2.10</artifactId>
>>>>                         <version>2.1.0</version>
>>>>                 </dependency>
>>>>                 <dependency>
>>>>                         <groupId>org.apache.spark</groupId>
>>>>                         <artifactId>spark-streaming_2.10</artifactId>
>>>>                         <version>2.1.0</version>
>>>>                 </dependency>
>>>>                 <dependency>
>>>>                         <groupId>org.apache.spark</groupId>
>>>>                         <artifactId>spark-streaming-ka
>>>> fka-0-8_2.10</artifactId>
>>>>                         <version>2.1.0</version>
>>>>                 </dependency>
>>>>                 <dependency>
>>>>                         <groupId>org.apache.spark</groupId>
>>>>                         <artifactId>spark-sql_2.10</artifactId>
>>>>                         <version>2.1.0</version>
>>>>                 </dependency>
>>>>                 <dependency>
>>>>                 <groupId>org.apache.spark</groupId>
>>>>                 <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
>>>>                 <version>2.1.0</version>
>>>>                 </dependency>
>>>> ------------------------------------------------------------
>>>> ------------------------
>>>> --
