Hi Michael,

Can you please check if I am using correct version of spark-streaming
library as specified in my pom (specified in the email) ?

col("value").cast("string") - throwing an error 'cannot find symbol method
col(java.lang.String)'
I tried $"value" which results into similar compilation error.

Thanks
Kaniska



On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Sorry, I don't think that I understand the question.  Value is just a
> binary blob that we get from kafka and pass to you.  If its stored in JSON,
> I think the code I provided is a good option, but if you are using a
> different encoding you may need to write a UDF.
>
> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <kaniska.man...@gmail.com>
> wrote:
>
>> Hi Michael,
>>
>> Thanks much for the suggestion.
>>
>> I was wondering - whats the best way to deserialize the 'value' field
>>
>>
>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> Encoders can only map data into an object if those columns already
>>> exist.  When we are reading from Kafka, we just get a binary blob and
>>> you'll need to help Spark parse that first.  Assuming your data is stored
>>> in JSON it should be pretty straight forward.
>>>
>>> streams = spark
>>>   .readStream()
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>>   .option(subscribeType, topics)
>>>   .load()
>>>   .withColumn("message", from_json(col("value").cast("string"),
>>> tweetSchema)) // cast the binary value to a string and parse it as json
>>>   .select("message.*") // unnest the json
>>>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
>>> lambda functions on the data using this class
>>>
>>> Here is some more info on working with JSON and other semi-structured
>>> formats
>>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>>> .
>>>
>>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Currently , encountering the following exception while working with
>>>> below-mentioned code snippet :
>>>>
>>>> > Please suggest the correct approach for reading the stream into a sql
>>>> > schema.
>>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>>> message -
>>>> > we can not change static schema for kafka.
>>>>
>>>> ------------------------------------------------------------
>>>> -------------------------------
>>>>
>>>> *exception*
>>>>
>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>>> '`location`' given input columns: [topic, timestamp, key, offset, value,
>>>> timestampType, partition]*;
>>>>         at
>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>>> At.failAnalysis(package.scala:42)
>>>>         at
>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
>>>> ------------------------------------------------------------
>>>> --------------------------------------------
>>>>
>>>> *structured streaming code snippet*
>>>>
>>>> String bootstrapServers = "localhost:9092";
>>>>             String subscribeType = "subscribe";
>>>>             String topics = "events";
>>>>
>>>>             StructType tweetSchema = new StructType()
>>>>                 .add("tweetId", "string")
>>>>                 .add("tweetText", "string")
>>>>                 .add("location", "string")
>>>>                 .add("timestamp", "string");
>>>>
>>>>            SparkSession spark = SparkSession
>>>>                               .builder()
>>>>                               .appName("StreamProcessor")
>>>>                               .config("spark.master", "local")
>>>>                               .getOrCreate();
>>>>
>>>>           Dataset<Tweet> streams = spark
>>>>                                       .readStream()
>>>>                                       .format("kafka")
>>>>                                       .option("kafka.bootstrap.servers",
>>>> bootstrapServers)
>>>>                                       .option(subscribeType, topics)
>>>>                                       .load()
>>>>                                       .as(Encoders.bean(Tweet.class));
>>>>
>>>>          streams.createOrReplaceTempView("streamsData");
>>>>
>>>>                    String sql = "SELECT location,  COUNT(*) as count
>>>> FROM streamsData
>>>> GROUP BY location";
>>>>                    Dataset<Row> countsByLocation = spark.sql(sql);
>>>>
>>>>                     StreamingQuery query =
>>>> countsByLocation.writeStream()
>>>>                       .outputMode("complete")
>>>>                       .format("console")
>>>>                       .start();
>>>>
>>>>                     query.awaitTermination();
>>>> ------------------------------------------------------------
>>>> --------------------------------------
>>>>
>>>> *Tweet *
>>>>
>>>> Tweet.java - has public constructor and getter / setter methods
>>>>
>>>> public class Tweet implements Serializable{
>>>>
>>>>         private String tweetId;
>>>>         private String tweetText;
>>>>         private String location;
>>>>         private String timestamp;
>>>>
>>>>         public Tweet(){
>>>>
>>>>         }
>>>> .............
>>>>
>>>> ------------------------------------------------------------
>>>> ----------------------------
>>>>
>>>> *pom.xml *
>>>>
>>>>
>>>>                 <dependency>
>>>>                         <groupId>org.apache.spark</groupId>
>>>>                         <artifactId>spark-core_2.10</artifactId>
>>>>                         <version>2.1.0</version>
>>>>                 </dependency>
>>>>                 <dependency>
>>>>                         <groupId>org.apache.spark</groupId>
>>>>                         <artifactId>spark-streaming_2.10</artifactId>
>>>>                         <version>2.1.0</version>
>>>>                 </dependency>
>>>>                 <dependency>
>>>>                         <groupId>org.apache.spark</groupId>
>>>>                         <artifactId>spark-streaming-ka
>>>> fka-0-8_2.10</artifactId>
>>>>                         <version>2.1.0</version>
>>>>                 </dependency>
>>>>                 <dependency>
>>>>                         <groupId>org.apache.spark</groupId>
>>>>                         <artifactId>spark-sql_2.10</artifactId>
>>>>                         <version>2.1.0</version>
>>>>                 </dependency>
>>>>                 <dependency>
>>>>                 <groupId>org.apache.spark</groupId>
>>>>                 <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
>>>>                 <version>2.1.0</version>
>>>>                 </dependency>
>>>> ------------------------------------------------------------
>>>> ------------------------
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-spark-user-list.
>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>
>

Reply via email to