Hi,

Currently , encountering the following exception while working with
below-mentioned code snippet :

> Please suggest the correct approach for reading the stream into a sql
> schema.
> If I add 'tweetSchema' while reading stream, it errors out with message -
> we can not change static schema for kafka.

-------------------------------------------------------------------------------------------

*exception*

Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
'`location`' given input columns: [topic, timestamp, key, offset, value,
timestampType, partition]*;
        at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
        at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
--------------------------------------------------------------------------------------------------------

*structured streaming code snippet*

String bootstrapServers = "localhost:9092";
            String subscribeType = "subscribe";
            String topics = "events";

            StructType tweetSchema = new StructType()
                .add("tweetId", "string")
                .add("tweetText", "string")
                .add("location", "string")
                .add("timestamp", "string");

           SparkSession spark = SparkSession
                              .builder()
                              .appName("StreamProcessor")
                              .config("spark.master", "local")
                              .getOrCreate();

          Dataset<Tweet> streams = spark
                                      .readStream()
                                      .format("kafka")
                                      .option("kafka.bootstrap.servers", 
bootstrapServers)
                                      .option(subscribeType, topics)
                                      .load()
                                      .as(Encoders.bean(Tweet.class));

         streams.createOrReplaceTempView("streamsData");
                   
                   String sql = "SELECT location,  COUNT(*) as count FROM 
streamsData
GROUP BY location";
                   Dataset<Row> countsByLocation = spark.sql(sql);

                    StreamingQuery query = countsByLocation.writeStream()
                      .outputMode("complete")
                      .format("console")
                      .start();

                    query.awaitTermination();
--------------------------------------------------------------------------------------------------

*Tweet *

Tweet.java - has public constructor and getter / setter methods

public class Tweet implements Serializable{
        
        private String tweetId;
        private String tweetText;
        private String location;
        private String timestamp;
        
        public Tweet(){
                
        }
.............  

----------------------------------------------------------------------------------------

*pom.xml *


                <dependency>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-core_2.10</artifactId>
                        <version>2.1.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-streaming_2.10</artifactId>
                        <version>2.1.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-streaming-kafka-0-8_2.10</artifactId>
                        <version>2.1.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-sql_2.10</artifactId>
                        <version>2.1.0</version>
                </dependency>
                <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
                <version>2.1.0</version>
                </dependency>
------------------------------------------------------------------------------------



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to