Hi Michael, Can you please check if I am using correct version of spark-streaming library as specified in my pom (specified in the email) ?
col("value").cast("string") - throwing an error 'cannot find symbol method col(java.lang.String)' I tried $"value" which results into similar compilation error. Thanks Kaniska On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust <mich...@databricks.com> wrote: > Sorry, I don't think that I understand the question. Value is just a > binary blob that we get from kafka and pass to you. If its stored in JSON, > I think the code I provided is a good option, but if you are using a > different encoding you may need to write a UDF. > > On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <kaniska.man...@gmail.com> > wrote: > >> Hi Michael, >> >> Thanks much for the suggestion. >> >> I was wondering - whats the best way to deserialize the 'value' field >> >> >> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust < >> mich...@databricks.com> wrote: >> >>> Encoders can only map data into an object if those columns already >>> exist. When we are reading from Kafka, we just get a binary blob and >>> you'll need to help Spark parse that first. Assuming your data is stored >>> in JSON it should be pretty straight forward. >>> >>> streams = spark >>> .readStream() >>> .format("kafka") >>> .option("kafka.bootstrap.servers", bootstrapServers) >>> .option(subscribeType, topics) >>> .load() >>> .withColumn("message", from_json(col("value").cast("string"), >>> tweetSchema)) // cast the binary value to a string and parse it as json >>> .select("message.*") // unnest the json >>> .as(Encoders.bean(Tweet.class)) // only required if you want to use >>> lambda functions on the data using this class >>> >>> Here is some more info on working with JSON and other semi-structured >>> formats >>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html> >>> . >>> >>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> Currently , encountering the following exception while working with >>>> below-mentioned code snippet : >>>> >>>> > Please suggest the correct approach for reading the stream into a sql >>>> > schema. >>>> > If I add 'tweetSchema' while reading stream, it errors out with >>>> message - >>>> > we can not change static schema for kafka. >>>> >>>> ------------------------------------------------------------ >>>> ------------------------------- >>>> >>>> *exception* >>>> >>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve >>>> '`location`' given input columns: [topic, timestamp, key, offset, value, >>>> timestampType, partition]*; >>>> at >>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError >>>> At.failAnalysis(package.scala:42) >>>> at >>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu >>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77) >>>> ------------------------------------------------------------ >>>> -------------------------------------------- >>>> >>>> *structured streaming code snippet* >>>> >>>> String bootstrapServers = "localhost:9092"; >>>> String subscribeType = "subscribe"; >>>> String topics = "events"; >>>> >>>> StructType tweetSchema = new StructType() >>>> .add("tweetId", "string") >>>> .add("tweetText", "string") >>>> .add("location", "string") >>>> .add("timestamp", "string"); >>>> >>>> SparkSession spark = SparkSession >>>> .builder() >>>> .appName("StreamProcessor") >>>> .config("spark.master", "local") >>>> .getOrCreate(); >>>> >>>> Dataset<Tweet> streams = spark >>>> .readStream() >>>> .format("kafka") >>>> .option("kafka.bootstrap.servers", >>>> bootstrapServers) >>>> .option(subscribeType, topics) >>>> .load() >>>> .as(Encoders.bean(Tweet.class)); >>>> >>>> streams.createOrReplaceTempView("streamsData"); >>>> >>>> String sql = "SELECT location, COUNT(*) as count >>>> FROM streamsData >>>> GROUP BY location"; >>>> Dataset<Row> countsByLocation = spark.sql(sql); >>>> >>>> StreamingQuery query = >>>> countsByLocation.writeStream() >>>> .outputMode("complete") >>>> .format("console") >>>> .start(); >>>> >>>> query.awaitTermination(); >>>> ------------------------------------------------------------ >>>> -------------------------------------- >>>> >>>> *Tweet * >>>> >>>> Tweet.java - has public constructor and getter / setter methods >>>> >>>> public class Tweet implements Serializable{ >>>> >>>> private String tweetId; >>>> private String tweetText; >>>> private String location; >>>> private String timestamp; >>>> >>>> public Tweet(){ >>>> >>>> } >>>> ............. >>>> >>>> ------------------------------------------------------------ >>>> ---------------------------- >>>> >>>> *pom.xml * >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.spark</groupId> >>>> <artifactId>spark-core_2.10</artifactId> >>>> <version>2.1.0</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.spark</groupId> >>>> <artifactId>spark-streaming_2.10</artifactId> >>>> <version>2.1.0</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.spark</groupId> >>>> <artifactId>spark-streaming-ka >>>> fka-0-8_2.10</artifactId> >>>> <version>2.1.0</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.spark</groupId> >>>> <artifactId>spark-sql_2.10</artifactId> >>>> <version>2.1.0</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.spark</groupId> >>>> <artifactId>spark-sql-kafka-0-10_2.10</artifactId> >>>> <version>2.1.0</version> >>>> </dependency> >>>> ------------------------------------------------------------ >>>> ------------------------ >>>> >>>> >>>> >>>> -- >>>> View this message in context: http://apache-spark-user-list. >>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>> >>>> >>> >> >