Re: unable to stream kafka messages
The exception is telling you precisely what is wrong. The kafka source has a schema of (topic, partition, offset, key, value, timestamp, timestampType). Nothing about those columns makes sense as a tweet. You need to inform spark how to get from bytes to tweet, it doesn't know how you serialized the messages into kafka. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537p29107.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: unable to stream kafka messages
;>> >>>>>> Here is some more info on working with JSON and other >>>>>> semi-structured formats >>>>>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html> >>>>>> . >>>>>> >>>>>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Currently , encountering the following exception while working with >>>>>>> below-mentioned code snippet : >>>>>>> >>>>>>> > Please suggest the correct approach for reading the stream into a >>>>>>> sql >>>>>>> > schema. >>>>>>> > If I add 'tweetSchema' while reading stream, it errors out with >>>>>>> message - >>>>>>> > we can not change static schema for kafka. >>>>>>> >>>>>>> >>>>>>> --- >>>>>>> >>>>>>> *exception* >>>>>>> >>>>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve >>>>>>> '`location`' given input columns: [topic, timestamp, key, offset, >>>>>>> value, >>>>>>> timestampType, partition]*; >>>>>>> at >>>>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError >>>>>>> At.failAnalysis(package.scala:42) >>>>>>> at >>>>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu >>>>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis >>>>>>> .scala:77) >>>>>>> >>>>>>> >>>>>>> >>>>>>> *structured streaming code snippet* >>>>>>> >>>>>>> String bootstrapServers = "localhost:9092"; >>>>>>> String subscribeType = "subscribe"; >>>>>>> String topics = "events"; >>>>>>> >>>>>>> StructType tweetSchema = new StructType() >>>>>>> .add("tweetId", "string") >>>>>>> .add("tweetText", "string") >>>>>>> .add("location", "string") >>>>>>> .add("timestamp", "string"); >>>>>>> >>>>>>>SparkSession spark = SparkSession >>>>>>> .builder() >>>>>>> .appName("StreamProcessor") >>>>>>> .config("spark.master", "local") >>>>>>> .getOrCreate(); >>>>>>> >>>>>>> Dataset streams = spark >>>>>>> .readStream() >>>>>>> .format("kafka") >>>>>>> .option("kafka.bootstrap.servers", >>>>>>> bootstrapServers) >>>>>>> .option(subscribeType, topics) >>>>>>> .load() >>>>>>> .as(Encoders.bean(Tweet.class) >>>>>>> ); >>>>>>> >>>>>>> streams.createOrReplaceTempView("streamsData"); >>>>>>> >>>>>>>String sql = "SELECT location, COUNT(*) as count >>>>>>> FROM streamsData >>>>>>> GROUP BY location"; >>>>>>>Dataset countsByLocation = spark.sql(sql); >>>>>>> >>>>>>> StreamingQuery query = >>>>>>> countsByLocation.writeStream() >>>>>>> .outputMode("complete") >>>>>>> .format("console") >>>>>>> .start(); >>>>>>> >>>>>>> query.awaitTermination(); >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> *Tweet * >>>>>>> >>>>>>> Tweet.java - has public constructor and getter / setter methods >>>>>>> >>>>>>> public class Tweet implements Serializable{ >>>>>>> >>>>>>> private String tweetId; >>>>>>> private String tweetText; >>>>>>> private String location; >>>>>>> private String timestamp; >>>>>>> >>>>>>> public Tweet(){ >>>>>>> >>>>>>> } >>>>>>> . >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> *pom.xml * >>>>>>> >>>>>>> >>>>>>> >>>>>>> org.apache.spark >>>>>>> spark-core_2.10 >>>>>>> 2.1.0 >>>>>>> >>>>>>> >>>>>>> org.apache.spark >>>>>>> spark-streaming_2. >>>>>>> 10 >>>>>>> 2.1.0 >>>>>>> >>>>>>> >>>>>>> org.apache.spark >>>>>>> spark-streaming-ka >>>>>>> fka-0-8_2.10 >>>>>>> 2.1.0 >>>>>>> >>>>>>> >>>>>>> org.apache.spark >>>>>>> spark-sql_2.10 >>>>>>> 2.1.0 >>>>>>> >>>>>>> >>>>>>> org.apache.spark >>>>>>> spark-sql-kafka-0-10_2.10 >>>>>>> 2.1.0 >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> View this message in context: http://apache-spark-user-list. >>>>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html >>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>> Nabble.com. >>>>>>> >>>>>>> >>>>>>> - >>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: unable to stream kafka messages
; Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve >>>>>> '`location`' given input columns: [topic, timestamp, key, offset, >>>>>> value, >>>>>> timestampType, partition]*; >>>>>> at >>>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError >>>>>> At.failAnalysis(package.scala:42) >>>>>> at >>>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu >>>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis >>>>>> .scala:77) >>>>>> >>>>>> >>>>>> >>>>>> *structured streaming code snippet* >>>>>> >>>>>> String bootstrapServers = "localhost:9092"; >>>>>> String subscribeType = "subscribe"; >>>>>> String topics = "events"; >>>>>> >>>>>> StructType tweetSchema = new StructType() >>>>>> .add("tweetId", "string") >>>>>> .add("tweetText", "string") >>>>>> .add("location", "string") >>>>>> .add("timestamp", "string"); >>>>>> >>>>>>SparkSession spark = SparkSession >>>>>> .builder() >>>>>> .appName("StreamProcessor") >>>>>> .config("spark.master", "local") >>>>>> .getOrCreate(); >>>>>> >>>>>> Dataset streams = spark >>>>>> .readStream() >>>>>> .format("kafka") >>>>>> .option("kafka.bootstrap.servers", >>>>>> bootstrapServers) >>>>>> .option(subscribeType, topics) >>>>>> .load() >>>>>> .as(Encoders.bean(Tweet.class) >>>>>> ); >>>>>> >>>>>> streams.createOrReplaceTempView("streamsData"); >>>>>> >>>>>>String sql = "SELECT location, COUNT(*) as count >>>>>> FROM streamsData >>>>>> GROUP BY location"; >>>>>>Dataset countsByLocation = spark.sql(sql); >>>>>> >>>>>> StreamingQuery query = >>>>>> countsByLocation.writeStream() >>>>>> .outputMode("complete") >>>>>> .format("console") >>>>>> .start(); >>>>>> >>>>>> query.awaitTermination(); >>>>>> >>>>>> -- >>>>>> >>>>>> *Tweet * >>>>>> >>>>>> Tweet.java - has public constructor and getter / setter methods >>>>>> >>>>>> public class Tweet implements Serializable{ >>>>>> >>>>>> private String tweetId; >>>>>> private String tweetText; >>>>>> private String location; >>>>>> private String timestamp; >>>>>> >>>>>> public Tweet(){ >>>>>> >>>>>> } >>>>>> . >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> *pom.xml * >>>>>> >>>>>> >>>>>> >>>>>> org.apache.spark >>>>>> spark-core_2.10 >>>>>> 2.1.0 >>>>>> >>>>>> >>>>>> org.apache.spark >>>>>> spark-streaming_2.10 >>>>>> 2.1.0 >>>>>> >>>>>> >>>>>> org.apache.spark >>>>>> spark-streaming-ka >>>>>> fka-0-8_2.10 >>>>>> 2.1.0 >>>>>> >>>>>> >>>>>> org.apache.spark >>>>>> spark-sql_2.10 >>>>>> 2.1.0 >>>>>> >>>>>> >>>>>> org.apache.spark >>>>>> spark-sql-kafka-0-10_2.10 >>>>>> 2.1.0 >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> View this message in context: http://apache-spark-user-list. >>>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html >>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>> Nabble.com. >>>>>> >>>>>> - >>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: unable to stream kafka messages
t;, "string") >>>>> .add("timestamp", "string"); >>>>> >>>>>SparkSession spark = SparkSession >>>>> .builder() >>>>> .appName("StreamProcessor") >>>>> .config("spark.master", "local") >>>>> .getOrCreate(); >>>>> >>>>> Dataset streams = spark >>>>> .readStream() >>>>> .format("kafka") >>>>> .option("kafka.bootstrap.servers", >>>>> bootstrapServers) >>>>> .option(subscribeType, topics) >>>>> .load() >>>>> .as(Encoders.bean(Tweet.class)); >>>>> >>>>> streams.createOrReplaceTempView("streamsData"); >>>>> >>>>>String sql = "SELECT location, COUNT(*) as count >>>>> FROM streamsData >>>>> GROUP BY location"; >>>>>Dataset countsByLocation = spark.sql(sql); >>>>> >>>>> StreamingQuery query = >>>>> countsByLocation.writeStream() >>>>> .outputMode("complete") >>>>> .format("console") >>>>> .start(); >>>>> >>>>> query.awaitTermination(); >>>>> >>>>> -- >>>>> >>>>> *Tweet * >>>>> >>>>> Tweet.java - has public constructor and getter / setter methods >>>>> >>>>> public class Tweet implements Serializable{ >>>>> >>>>> private String tweetId; >>>>> private String tweetText; >>>>> private String location; >>>>> private String timestamp; >>>>> >>>>> public Tweet(){ >>>>> >>>>> } >>>>> . >>>>> >>>>> >>>>> >>>>> >>>>> *pom.xml * >>>>> >>>>> >>>>> >>>>> org.apache.spark >>>>> spark-core_2.10 >>>>> 2.1.0 >>>>> >>>>> >>>>> org.apache.spark >>>>> spark-streaming_2.10 >>>>> 2.1.0 >>>>> >>>>> >>>>> org.apache.spark >>>>> spark-streaming-ka >>>>> fka-0-8_2.10 >>>>> 2.1.0 >>>>> >>>>> >>>>> org.apache.spark >>>>> spark-sql_2.10 >>>>> 2.1.0 >>>>> >>>>> >>>>> org.apache.spark >>>>> spark-sql-kafka-0-10_2.10 >>>>> 2.1.0 >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: http://apache-spark-user-list. >>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html >>>>> Sent from the Apache Spark User List mailing list archive at >>>>> Nabble.com. >>>>> >>>>> - >>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>> >>>>> >>>> >>> >> >
Re: unable to stream kafka messages
streams = spark >>>> .readStream() >>>> .format("kafka") >>>> .option("kafka.bootstrap.servers", >>>> bootstrapServers) >>>> .option(subscribeType, topics) >>>> .load() >>>> .as(Encoders.bean(Tweet.class)); >>>> >>>> streams.createOrReplaceTempView("streamsData"); >>>> >>>>String sql = "SELECT location, COUNT(*) as count >>>> FROM streamsData >>>> GROUP BY location"; >>>>Dataset countsByLocation = spark.sql(sql); >>>> >>>> StreamingQuery query = >>>> countsByLocation.writeStream() >>>> .outputMode("complete") >>>> .format("console") >>>> .start(); >>>> >>>> query.awaitTermination(); >>>> >>>> -- >>>> >>>> *Tweet * >>>> >>>> Tweet.java - has public constructor and getter / setter methods >>>> >>>> public class Tweet implements Serializable{ >>>> >>>> private String tweetId; >>>> private String tweetText; >>>> private String location; >>>> private String timestamp; >>>> >>>> public Tweet(){ >>>> >>>> } >>>> . >>>> >>>> >>>> >>>> >>>> *pom.xml * >>>> >>>> >>>> >>>> org.apache.spark >>>> spark-core_2.10 >>>> 2.1.0 >>>> >>>> >>>> org.apache.spark >>>> spark-streaming_2.10 >>>> 2.1.0 >>>> >>>> >>>> org.apache.spark >>>> spark-streaming-ka >>>> fka-0-8_2.10 >>>> 2.1.0 >>>> >>>> >>>> org.apache.spark >>>> spark-sql_2.10 >>>> 2.1.0 >>>> >>>> >>>> org.apache.spark >>>> spark-sql-kafka-0-10_2.10 >>>> 2.1.0 >>>> >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> View this message in context: http://apache-spark-user-list. >>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>> >>>> >>> >> >
Re: unable to stream kafka messages
Dataset countsByLocation = spark.sql(sql); >>> >>> StreamingQuery query = countsByLocation.writeStream() >>> .outputMode("complete") >>> .format("console") >>> .start(); >>> >>> query.awaitTermination(); >>> >>> -- >>> >>> *Tweet * >>> >>> Tweet.java - has public constructor and getter / setter methods >>> >>> public class Tweet implements Serializable{ >>> >>> private String tweetId; >>> private String tweetText; >>> private String location; >>> private String timestamp; >>> >>> public Tweet(){ >>> >>> } >>> . >>> >>> >>> >>> >>> *pom.xml * >>> >>> >>> >>> org.apache.spark >>> spark-core_2.10 >>> 2.1.0 >>> >>> >>> org.apache.spark >>> spark-streaming_2.10 >>> 2.1.0 >>> >>> >>> org.apache.spark >>> spark-streaming-ka >>> fka-0-8_2.10 >>> 2.1.0 >>> >>> >>> org.apache.spark >>> spark-sql_2.10 >>> 2.1.0 >>> >>> >>> org.apache.spark >>> spark-sql-kafka-0-10_2.10 >>> 2.1.0 >>> >>> >>> >>> >>> >>> >>> -- >>> View this message in context: http://apache-spark-user-list. >>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>> >> >
Re: unable to stream kafka messages
t; private String tweetText; >> private String location; >> private String timestamp; >> >> public Tweet(){ >> >> } >> . >> >> >> >> >> *pom.xml * >> >> >> >> org.apache.spark >> spark-core_2.10 >> 2.1.0 >> >> >> org.apache.spark >> spark-streaming_2.10 >> 2.1.0 >> >> >> org.apache.spark >> spark-streaming-ka >> fka-0-8_2.10 >> 2.1.0 >> >> >> org.apache.spark >> spark-sql_2.10 >> 2.1.0 >> >> >> org.apache.spark >> spark-sql-kafka-0-10_2.10 >> 2.1.0 >> >> >> >> >> >> >> -- >> View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >
Re: unable to stream kafka messages
0 > > > org.apache.spark > spark-streaming- > kafka-0-8_2.10 > 2.1.0 > > > org.apache.spark > spark-sql_2.10 > 2.1.0 > > > org.apache.spark > spark-sql-kafka-0-10_2.10 > 2.1.0 > > > > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
unable to stream kafka messages
Hi, Currently , encountering the following exception while working with below-mentioned code snippet : > Please suggest the correct approach for reading the stream into a sql > schema. > If I add 'tweetSchema' while reading stream, it errors out with message - > we can not change static schema for kafka. --- *exception* Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve '`location`' given input columns: [topic, timestamp, key, offset, value, timestampType, partition]*; at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77) *structured streaming code snippet* String bootstrapServers = "localhost:9092"; String subscribeType = "subscribe"; String topics = "events"; StructType tweetSchema = new StructType() .add("tweetId", "string") .add("tweetText", "string") .add("location", "string") .add("timestamp", "string"); SparkSession spark = SparkSession .builder() .appName("StreamProcessor") .config("spark.master", "local") .getOrCreate(); Dataset streams = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option(subscribeType, topics) .load() .as(Encoders.bean(Tweet.class)); streams.createOrReplaceTempView("streamsData"); String sql = "SELECT location, COUNT(*) as count FROM streamsData GROUP BY location"; Dataset countsByLocation = spark.sql(sql); StreamingQuery query = countsByLocation.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination(); -- *Tweet * Tweet.java - has public constructor and getter / setter methods public class Tweet implements Serializable{ private String tweetId; private String tweetText; private String location; private String timestamp; public Tweet(){ } . *pom.xml * org.apache.spark spark-core_2.10 2.1.0 org.apache.spark spark-streaming_2.10 2.1.0 org.apache.spark spark-streaming-kafka-0-8_2.10 2.1.0 org.apache.spark spark-sql_2.10 2.1.0 org.apache.spark spark-sql-kafka-0-10_2.10 2.1.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org