Hi, Currently , encountering the following exception while working with below-mentioned code snippet :
> Please suggest the correct approach for reading the stream into a sql > schema. > If I add 'tweetSchema' while reading stream, it errors out with message - > we can not change static schema for kafka. ------------------------------------------------------------------------------------------- *exception* Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve '`location`' given input columns: [topic, timestamp, key, offset, value, timestampType, partition]*; at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77) -------------------------------------------------------------------------------------------------------- *structured streaming code snippet* String bootstrapServers = "localhost:9092"; String subscribeType = "subscribe"; String topics = "events"; StructType tweetSchema = new StructType() .add("tweetId", "string") .add("tweetText", "string") .add("location", "string") .add("timestamp", "string"); SparkSession spark = SparkSession .builder() .appName("StreamProcessor") .config("spark.master", "local") .getOrCreate(); Dataset<Tweet> streams = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option(subscribeType, topics) .load() .as(Encoders.bean(Tweet.class)); streams.createOrReplaceTempView("streamsData"); String sql = "SELECT location, COUNT(*) as count FROM streamsData GROUP BY location"; Dataset<Row> countsByLocation = spark.sql(sql); StreamingQuery query = countsByLocation.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination(); -------------------------------------------------------------------------------------------------- *Tweet * Tweet.java - has public constructor and getter / setter methods public class Tweet implements Serializable{ private String tweetId; private String tweetText; private String location; private String timestamp; public Tweet(){ } ............. ---------------------------------------------------------------------------------------- *pom.xml * <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.10</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.10</artifactId> <version>2.1.0</version> </dependency> ------------------------------------------------------------------------------------ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org