Sorry, I don't think that I understand the question.  Value is just a
binary blob that we get from kafka and pass to you.  If its stored in JSON,
I think the code I provided is a good option, but if you are using a
different encoding you may need to write a UDF.

On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <kaniska.man...@gmail.com>
wrote:

> Hi Michael,
>
> Thanks much for the suggestion.
>
> I was wondering - whats the best way to deserialize the 'value' field
>
>
> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <mich...@databricks.com
> > wrote:
>
>> Encoders can only map data into an object if those columns already
>> exist.  When we are reading from Kafka, we just get a binary blob and
>> you'll need to help Spark parse that first.  Assuming your data is stored
>> in JSON it should be pretty straight forward.
>>
>> streams = spark
>>   .readStream()
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>   .option(subscribeType, topics)
>>   .load()
>>   .withColumn("message", from_json(col("value").cast("string"),
>> tweetSchema)) // cast the binary value to a string and parse it as json
>>   .select("message.*") // unnest the json
>>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
>> lambda functions on the data using this class
>>
>> Here is some more info on working with JSON and other semi-structured
>> formats
>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>> .
>>
>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Currently , encountering the following exception while working with
>>> below-mentioned code snippet :
>>>
>>> > Please suggest the correct approach for reading the stream into a sql
>>> > schema.
>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>> message -
>>> > we can not change static schema for kafka.
>>>
>>> ------------------------------------------------------------
>>> -------------------------------
>>>
>>> *exception*
>>>
>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>> '`location`' given input columns: [topic, timestamp, key, offset, value,
>>> timestampType, partition]*;
>>>         at
>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>> At.failAnalysis(package.scala:42)
>>>         at
>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
>>> ------------------------------------------------------------
>>> --------------------------------------------
>>>
>>> *structured streaming code snippet*
>>>
>>> String bootstrapServers = "localhost:9092";
>>>             String subscribeType = "subscribe";
>>>             String topics = "events";
>>>
>>>             StructType tweetSchema = new StructType()
>>>                 .add("tweetId", "string")
>>>                 .add("tweetText", "string")
>>>                 .add("location", "string")
>>>                 .add("timestamp", "string");
>>>
>>>            SparkSession spark = SparkSession
>>>                               .builder()
>>>                               .appName("StreamProcessor")
>>>                               .config("spark.master", "local")
>>>                               .getOrCreate();
>>>
>>>           Dataset<Tweet> streams = spark
>>>                                       .readStream()
>>>                                       .format("kafka")
>>>                                       .option("kafka.bootstrap.servers",
>>> bootstrapServers)
>>>                                       .option(subscribeType, topics)
>>>                                       .load()
>>>                                       .as(Encoders.bean(Tweet.class));
>>>
>>>          streams.createOrReplaceTempView("streamsData");
>>>
>>>                    String sql = "SELECT location,  COUNT(*) as count
>>> FROM streamsData
>>> GROUP BY location";
>>>                    Dataset<Row> countsByLocation = spark.sql(sql);
>>>
>>>                     StreamingQuery query = countsByLocation.writeStream()
>>>                       .outputMode("complete")
>>>                       .format("console")
>>>                       .start();
>>>
>>>                     query.awaitTermination();
>>> ------------------------------------------------------------
>>> --------------------------------------
>>>
>>> *Tweet *
>>>
>>> Tweet.java - has public constructor and getter / setter methods
>>>
>>> public class Tweet implements Serializable{
>>>
>>>         private String tweetId;
>>>         private String tweetText;
>>>         private String location;
>>>         private String timestamp;
>>>
>>>         public Tweet(){
>>>
>>>         }
>>> .............
>>>
>>> ------------------------------------------------------------
>>> ----------------------------
>>>
>>> *pom.xml *
>>>
>>>
>>>                 <dependency>
>>>                         <groupId>org.apache.spark</groupId>
>>>                         <artifactId>spark-core_2.10</artifactId>
>>>                         <version>2.1.0</version>
>>>                 </dependency>
>>>                 <dependency>
>>>                         <groupId>org.apache.spark</groupId>
>>>                         <artifactId>spark-streaming_2.10</artifactId>
>>>                         <version>2.1.0</version>
>>>                 </dependency>
>>>                 <dependency>
>>>                         <groupId>org.apache.spark</groupId>
>>>                         <artifactId>spark-streaming-ka
>>> fka-0-8_2.10</artifactId>
>>>                         <version>2.1.0</version>
>>>                 </dependency>
>>>                 <dependency>
>>>                         <groupId>org.apache.spark</groupId>
>>>                         <artifactId>spark-sql_2.10</artifactId>
>>>                         <version>2.1.0</version>
>>>                 </dependency>
>>>                 <dependency>
>>>                 <groupId>org.apache.spark</groupId>
>>>                 <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
>>>                 <version>2.1.0</version>
>>>                 </dependency>
>>> ------------------------------------------------------------
>>> ------------------------
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>

Reply via email to