Hi Michael,

Thanks much for the suggestion.

I was wondering - whats the best way to deserialize the 'value' field


On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> Encoders can only map data into an object if those columns already exist.
> When we are reading from Kafka, we just get a binary blob and you'll need
> to help Spark parse that first.  Assuming your data is stored in JSON it
> should be pretty straight forward.
>
> streams = spark
>   .readStream()
>   .format("kafka")
>   .option("kafka.bootstrap.servers", bootstrapServers)
>   .option(subscribeType, topics)
>   .load()
>   .withColumn("message", from_json(col("value").cast("string"),
> tweetSchema)) // cast the binary value to a string and parse it as json
>   .select("message.*") // unnest the json
>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
> lambda functions on the data using this class
>
> Here is some more info on working with JSON and other semi-structured
> formats
> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
> .
>
> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Currently , encountering the following exception while working with
>> below-mentioned code snippet :
>>
>> > Please suggest the correct approach for reading the stream into a sql
>> > schema.
>> > If I add 'tweetSchema' while reading stream, it errors out with message
>> -
>> > we can not change static schema for kafka.
>>
>> ------------------------------------------------------------
>> -------------------------------
>>
>> *exception*
>>
>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>> '`location`' given input columns: [topic, timestamp, key, offset, value,
>> timestampType, partition]*;
>>         at
>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>> At.failAnalysis(package.scala:42)
>>         at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
>> ------------------------------------------------------------
>> --------------------------------------------
>>
>> *structured streaming code snippet*
>>
>> String bootstrapServers = "localhost:9092";
>>             String subscribeType = "subscribe";
>>             String topics = "events";
>>
>>             StructType tweetSchema = new StructType()
>>                 .add("tweetId", "string")
>>                 .add("tweetText", "string")
>>                 .add("location", "string")
>>                 .add("timestamp", "string");
>>
>>            SparkSession spark = SparkSession
>>                               .builder()
>>                               .appName("StreamProcessor")
>>                               .config("spark.master", "local")
>>                               .getOrCreate();
>>
>>           Dataset<Tweet> streams = spark
>>                                       .readStream()
>>                                       .format("kafka")
>>                                       .option("kafka.bootstrap.servers",
>> bootstrapServers)
>>                                       .option(subscribeType, topics)
>>                                       .load()
>>                                       .as(Encoders.bean(Tweet.class));
>>
>>          streams.createOrReplaceTempView("streamsData");
>>
>>                    String sql = "SELECT location,  COUNT(*) as count FROM
>> streamsData
>> GROUP BY location";
>>                    Dataset<Row> countsByLocation = spark.sql(sql);
>>
>>                     StreamingQuery query = countsByLocation.writeStream()
>>                       .outputMode("complete")
>>                       .format("console")
>>                       .start();
>>
>>                     query.awaitTermination();
>> ------------------------------------------------------------
>> --------------------------------------
>>
>> *Tweet *
>>
>> Tweet.java - has public constructor and getter / setter methods
>>
>> public class Tweet implements Serializable{
>>
>>         private String tweetId;
>>         private String tweetText;
>>         private String location;
>>         private String timestamp;
>>
>>         public Tweet(){
>>
>>         }
>> .............
>>
>> ------------------------------------------------------------
>> ----------------------------
>>
>> *pom.xml *
>>
>>
>>                 <dependency>
>>                         <groupId>org.apache.spark</groupId>
>>                         <artifactId>spark-core_2.10</artifactId>
>>                         <version>2.1.0</version>
>>                 </dependency>
>>                 <dependency>
>>                         <groupId>org.apache.spark</groupId>
>>                         <artifactId>spark-streaming_2.10</artifactId>
>>                         <version>2.1.0</version>
>>                 </dependency>
>>                 <dependency>
>>                         <groupId>org.apache.spark</groupId>
>>                         <artifactId>spark-streaming-ka
>> fka-0-8_2.10</artifactId>
>>                         <version>2.1.0</version>
>>                 </dependency>
>>                 <dependency>
>>                         <groupId>org.apache.spark</groupId>
>>                         <artifactId>spark-sql_2.10</artifactId>
>>                         <version>2.1.0</version>
>>                 </dependency>
>>                 <dependency>
>>                 <groupId>org.apache.spark</groupId>
>>                 <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
>>                 <version>2.1.0</version>
>>                 </dependency>
>> ------------------------------------------------------------
>> ------------------------
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>

Reply via email to