You need to import col from org.apache.spark.sql.functions. On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal <kaniska.man...@gmail.com> wrote:
> Hi Michael, > > Can you please check if I am using correct version of spark-streaming > library as specified in my pom (specified in the email) ? > > col("value").cast("string") - throwing an error 'cannot find symbol > method col(java.lang.String)' > I tried $"value" which results into similar compilation error. > > Thanks > Kaniska > > > > On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust <mich...@databricks.com > > wrote: > >> Sorry, I don't think that I understand the question. Value is just a >> binary blob that we get from kafka and pass to you. If its stored in JSON, >> I think the code I provided is a good option, but if you are using a >> different encoding you may need to write a UDF. >> >> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <kaniska.man...@gmail.com >> > wrote: >> >>> Hi Michael, >>> >>> Thanks much for the suggestion. >>> >>> I was wondering - whats the best way to deserialize the 'value' field >>> >>> >>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust < >>> mich...@databricks.com> wrote: >>> >>>> Encoders can only map data into an object if those columns already >>>> exist. When we are reading from Kafka, we just get a binary blob and >>>> you'll need to help Spark parse that first. Assuming your data is stored >>>> in JSON it should be pretty straight forward. >>>> >>>> streams = spark >>>> .readStream() >>>> .format("kafka") >>>> .option("kafka.bootstrap.servers", bootstrapServers) >>>> .option(subscribeType, topics) >>>> .load() >>>> .withColumn("message", from_json(col("value").cast("string"), >>>> tweetSchema)) // cast the binary value to a string and parse it as json >>>> .select("message.*") // unnest the json >>>> .as(Encoders.bean(Tweet.class)) // only required if you want to use >>>> lambda functions on the data using this class >>>> >>>> Here is some more info on working with JSON and other semi-structured >>>> formats >>>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html> >>>> . >>>> >>>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Currently , encountering the following exception while working with >>>>> below-mentioned code snippet : >>>>> >>>>> > Please suggest the correct approach for reading the stream into a sql >>>>> > schema. >>>>> > If I add 'tweetSchema' while reading stream, it errors out with >>>>> message - >>>>> > we can not change static schema for kafka. >>>>> >>>>> ------------------------------------------------------------ >>>>> ------------------------------- >>>>> >>>>> *exception* >>>>> >>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve >>>>> '`location`' given input columns: [topic, timestamp, key, offset, >>>>> value, >>>>> timestampType, partition]*; >>>>> at >>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError >>>>> At.failAnalysis(package.scala:42) >>>>> at >>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu >>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77) >>>>> ------------------------------------------------------------ >>>>> -------------------------------------------- >>>>> >>>>> *structured streaming code snippet* >>>>> >>>>> String bootstrapServers = "localhost:9092"; >>>>> String subscribeType = "subscribe"; >>>>> String topics = "events"; >>>>> >>>>> StructType tweetSchema = new StructType() >>>>> .add("tweetId", "string") >>>>> .add("tweetText", "string") >>>>> .add("location", "string") >>>>> .add("timestamp", "string"); >>>>> >>>>> SparkSession spark = SparkSession >>>>> .builder() >>>>> .appName("StreamProcessor") >>>>> .config("spark.master", "local") >>>>> .getOrCreate(); >>>>> >>>>> Dataset<Tweet> streams = spark >>>>> .readStream() >>>>> .format("kafka") >>>>> .option("kafka.bootstrap.servers", >>>>> bootstrapServers) >>>>> .option(subscribeType, topics) >>>>> .load() >>>>> .as(Encoders.bean(Tweet.class)); >>>>> >>>>> streams.createOrReplaceTempView("streamsData"); >>>>> >>>>> String sql = "SELECT location, COUNT(*) as count >>>>> FROM streamsData >>>>> GROUP BY location"; >>>>> Dataset<Row> countsByLocation = spark.sql(sql); >>>>> >>>>> StreamingQuery query = >>>>> countsByLocation.writeStream() >>>>> .outputMode("complete") >>>>> .format("console") >>>>> .start(); >>>>> >>>>> query.awaitTermination(); >>>>> ------------------------------------------------------------ >>>>> -------------------------------------- >>>>> >>>>> *Tweet * >>>>> >>>>> Tweet.java - has public constructor and getter / setter methods >>>>> >>>>> public class Tweet implements Serializable{ >>>>> >>>>> private String tweetId; >>>>> private String tweetText; >>>>> private String location; >>>>> private String timestamp; >>>>> >>>>> public Tweet(){ >>>>> >>>>> } >>>>> ............. >>>>> >>>>> ------------------------------------------------------------ >>>>> ---------------------------- >>>>> >>>>> *pom.xml * >>>>> >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.spark</groupId> >>>>> <artifactId>spark-core_2.10</artifactId> >>>>> <version>2.1.0</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.spark</groupId> >>>>> <artifactId>spark-streaming_2.10</artifactId> >>>>> <version>2.1.0</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.spark</groupId> >>>>> <artifactId>spark-streaming-ka >>>>> fka-0-8_2.10</artifactId> >>>>> <version>2.1.0</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.spark</groupId> >>>>> <artifactId>spark-sql_2.10</artifactId> >>>>> <version>2.1.0</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.spark</groupId> >>>>> <artifactId>spark-sql-kafka-0-10_2.10</artifactId> >>>>> <version>2.1.0</version> >>>>> </dependency> >>>>> ------------------------------------------------------------ >>>>> ------------------------ >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: http://apache-spark-user-list. >>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html >>>>> Sent from the Apache Spark User List mailing list archive at >>>>> Nabble.com. >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>> >>>>> >>>> >>> >> >