You need to import col from org.apache.spark.sql.functions.

On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal <kaniska.man...@gmail.com>
wrote:

> Hi Michael,
>
> Can you please check if I am using correct version of spark-streaming
> library as specified in my pom (specified in the email) ?
>
> col("value").cast("string") - throwing an error 'cannot find symbol
> method col(java.lang.String)'
> I tried $"value" which results into similar compilation error.
>
> Thanks
> Kaniska
>
>
>
> On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust <mich...@databricks.com
> > wrote:
>
>> Sorry, I don't think that I understand the question.  Value is just a
>> binary blob that we get from kafka and pass to you.  If its stored in JSON,
>> I think the code I provided is a good option, but if you are using a
>> different encoding you may need to write a UDF.
>>
>> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <kaniska.man...@gmail.com
>> > wrote:
>>
>>> Hi Michael,
>>>
>>> Thanks much for the suggestion.
>>>
>>> I was wondering - whats the best way to deserialize the 'value' field
>>>
>>>
>>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> Encoders can only map data into an object if those columns already
>>>> exist.  When we are reading from Kafka, we just get a binary blob and
>>>> you'll need to help Spark parse that first.  Assuming your data is stored
>>>> in JSON it should be pretty straight forward.
>>>>
>>>> streams = spark
>>>>   .readStream()
>>>>   .format("kafka")
>>>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>>>   .option(subscribeType, topics)
>>>>   .load()
>>>>   .withColumn("message", from_json(col("value").cast("string"),
>>>> tweetSchema)) // cast the binary value to a string and parse it as json
>>>>   .select("message.*") // unnest the json
>>>>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
>>>> lambda functions on the data using this class
>>>>
>>>> Here is some more info on working with JSON and other semi-structured
>>>> formats
>>>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>>>> .
>>>>
>>>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Currently , encountering the following exception while working with
>>>>> below-mentioned code snippet :
>>>>>
>>>>> > Please suggest the correct approach for reading the stream into a sql
>>>>> > schema.
>>>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>>>> message -
>>>>> > we can not change static schema for kafka.
>>>>>
>>>>> ------------------------------------------------------------
>>>>> -------------------------------
>>>>>
>>>>> *exception*
>>>>>
>>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>>>> '`location`' given input columns: [topic, timestamp, key, offset,
>>>>> value,
>>>>> timestampType, partition]*;
>>>>>         at
>>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>>>> At.failAnalysis(package.scala:42)
>>>>>         at
>>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
>>>>> ------------------------------------------------------------
>>>>> --------------------------------------------
>>>>>
>>>>> *structured streaming code snippet*
>>>>>
>>>>> String bootstrapServers = "localhost:9092";
>>>>>             String subscribeType = "subscribe";
>>>>>             String topics = "events";
>>>>>
>>>>>             StructType tweetSchema = new StructType()
>>>>>                 .add("tweetId", "string")
>>>>>                 .add("tweetText", "string")
>>>>>                 .add("location", "string")
>>>>>                 .add("timestamp", "string");
>>>>>
>>>>>            SparkSession spark = SparkSession
>>>>>                               .builder()
>>>>>                               .appName("StreamProcessor")
>>>>>                               .config("spark.master", "local")
>>>>>                               .getOrCreate();
>>>>>
>>>>>           Dataset<Tweet> streams = spark
>>>>>                                       .readStream()
>>>>>                                       .format("kafka")
>>>>>                                       .option("kafka.bootstrap.servers",
>>>>> bootstrapServers)
>>>>>                                       .option(subscribeType, topics)
>>>>>                                       .load()
>>>>>                                       .as(Encoders.bean(Tweet.class));
>>>>>
>>>>>          streams.createOrReplaceTempView("streamsData");
>>>>>
>>>>>                    String sql = "SELECT location,  COUNT(*) as count
>>>>> FROM streamsData
>>>>> GROUP BY location";
>>>>>                    Dataset<Row> countsByLocation = spark.sql(sql);
>>>>>
>>>>>                     StreamingQuery query =
>>>>> countsByLocation.writeStream()
>>>>>                       .outputMode("complete")
>>>>>                       .format("console")
>>>>>                       .start();
>>>>>
>>>>>                     query.awaitTermination();
>>>>> ------------------------------------------------------------
>>>>> --------------------------------------
>>>>>
>>>>> *Tweet *
>>>>>
>>>>> Tweet.java - has public constructor and getter / setter methods
>>>>>
>>>>> public class Tweet implements Serializable{
>>>>>
>>>>>         private String tweetId;
>>>>>         private String tweetText;
>>>>>         private String location;
>>>>>         private String timestamp;
>>>>>
>>>>>         public Tweet(){
>>>>>
>>>>>         }
>>>>> .............
>>>>>
>>>>> ------------------------------------------------------------
>>>>> ----------------------------
>>>>>
>>>>> *pom.xml *
>>>>>
>>>>>
>>>>>                 <dependency>
>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>                         <artifactId>spark-core_2.10</artifactId>
>>>>>                         <version>2.1.0</version>
>>>>>                 </dependency>
>>>>>                 <dependency>
>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>                         <artifactId>spark-streaming_2.10</artifactId>
>>>>>                         <version>2.1.0</version>
>>>>>                 </dependency>
>>>>>                 <dependency>
>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>                         <artifactId>spark-streaming-ka
>>>>> fka-0-8_2.10</artifactId>
>>>>>                         <version>2.1.0</version>
>>>>>                 </dependency>
>>>>>                 <dependency>
>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>                         <artifactId>spark-sql_2.10</artifactId>
>>>>>                         <version>2.1.0</version>
>>>>>                 </dependency>
>>>>>                 <dependency>
>>>>>                 <groupId>org.apache.spark</groupId>
>>>>>                 <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
>>>>>                 <version>2.1.0</version>
>>>>>                 </dependency>
>>>>> ------------------------------------------------------------
>>>>> ------------------------
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context: http://apache-spark-user-list.
>>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to