Ah, I understand what you are asking now.  There is no API for specifying a
kafka specific "decoder", since Spark SQL already has a rich language for
expressing transformations.  The dataframe code I gave will parse the JSON
and materialize in a class, very similar to what objectMapper.readValue(bytes,
Tweet.class) would do.

However, there are other cases where you might need to do some domain
specific transformation that Spark SQL doesn't support natively.  In this
case you can write a UDF that does the translation. There are a couple of
different ways you can specify this, depending on whether you want to
map/flatMap or just apply the function as a UDF to a single column
<http://stackoverflow.com/questions/35348058/how-do-i-call-a-udf-on-a-spark-dataframe-using-java>
.


On Mon, Mar 27, 2017 at 1:59 PM, kaniska Mandal <kaniska.man...@gmail.com>
wrote:

> yup, that solves the compilation issue :-)
>
> one quick question regarding specifying Decoder in kafka stream:
>
> please note that I am encoding the message as follows while sending data
> to kafka -
>
> <TweetEncoder>
>
> *String msg = objectMapper.writeValueAsString(tweetEvent);*
>
> *return msg.getBytes();*
>
> I have a corresponding <TweetDecoder>
>
> *return objectMapper.readValue(bytes, Tweet.class)*
>
>
> *>> how do I specify the Decoder in the following stream-processing flow ?*
> streams = spark
>   .readStream()
>   .format("kafka")
>   .option("kafka.bootstrap.servers", bootstrapServers)
>   .option(subscribeType, topics)
>   .load()
>   .withColumn("message", from_json(col("value").cast("string"),
> tweetSchema)) // cast the binary value to a string and parse it as json
>   .select("message.*") // unnest the json
>   .as(Encoders.bean(Tweet.class))
>
> Thanks
> Kaniska
>
> ---------------------------------------------
>
> On Mon, Mar 27, 2017 at 1:25 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> You need to import col from org.apache.spark.sql.functions.
>>
>> On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal <kaniska.man...@gmail.com
>> > wrote:
>>
>>> Hi Michael,
>>>
>>> Can you please check if I am using correct version of spark-streaming
>>> library as specified in my pom (specified in the email) ?
>>>
>>> col("value").cast("string") - throwing an error 'cannot find symbol
>>> method col(java.lang.String)'
>>> I tried $"value" which results into similar compilation error.
>>>
>>> Thanks
>>> Kaniska
>>>
>>>
>>>
>>> On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> Sorry, I don't think that I understand the question.  Value is just a
>>>> binary blob that we get from kafka and pass to you.  If its stored in JSON,
>>>> I think the code I provided is a good option, but if you are using a
>>>> different encoding you may need to write a UDF.
>>>>
>>>> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <
>>>> kaniska.man...@gmail.com> wrote:
>>>>
>>>>> Hi Michael,
>>>>>
>>>>> Thanks much for the suggestion.
>>>>>
>>>>> I was wondering - whats the best way to deserialize the 'value' field
>>>>>
>>>>>
>>>>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
>>>>> mich...@databricks.com> wrote:
>>>>>
>>>>>> Encoders can only map data into an object if those columns already
>>>>>> exist.  When we are reading from Kafka, we just get a binary blob and
>>>>>> you'll need to help Spark parse that first.  Assuming your data is stored
>>>>>> in JSON it should be pretty straight forward.
>>>>>>
>>>>>> streams = spark
>>>>>>   .readStream()
>>>>>>   .format("kafka")
>>>>>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>>>>>   .option(subscribeType, topics)
>>>>>>   .load()
>>>>>>   .withColumn("message", from_json(col("value").cast("string"),
>>>>>> tweetSchema)) // cast the binary value to a string and parse it as json
>>>>>>   .select("message.*") // unnest the json
>>>>>>   .as(Encoders.bean(Tweet.class)) // only required if you want to
>>>>>> use lambda functions on the data using this class
>>>>>>
>>>>>> Here is some more info on working with JSON and other
>>>>>> semi-structured formats
>>>>>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>>>>>> .
>>>>>>
>>>>>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Currently , encountering the following exception while working with
>>>>>>> below-mentioned code snippet :
>>>>>>>
>>>>>>> > Please suggest the correct approach for reading the stream into a
>>>>>>> sql
>>>>>>> > schema.
>>>>>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>>>>>> message -
>>>>>>> > we can not change static schema for kafka.
>>>>>>>
>>>>>>> ------------------------------------------------------------
>>>>>>> -------------------------------
>>>>>>>
>>>>>>> *exception*
>>>>>>>
>>>>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>>>>>> '`location`' given input columns: [topic, timestamp, key, offset,
>>>>>>> value,
>>>>>>> timestampType, partition]*;
>>>>>>>         at
>>>>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>>>>>> At.failAnalysis(package.scala:42)
>>>>>>>         at
>>>>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>>>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis
>>>>>>> .scala:77)
>>>>>>> ------------------------------------------------------------
>>>>>>> --------------------------------------------
>>>>>>>
>>>>>>> *structured streaming code snippet*
>>>>>>>
>>>>>>> String bootstrapServers = "localhost:9092";
>>>>>>>             String subscribeType = "subscribe";
>>>>>>>             String topics = "events";
>>>>>>>
>>>>>>>             StructType tweetSchema = new StructType()
>>>>>>>                 .add("tweetId", "string")
>>>>>>>                 .add("tweetText", "string")
>>>>>>>                 .add("location", "string")
>>>>>>>                 .add("timestamp", "string");
>>>>>>>
>>>>>>>            SparkSession spark = SparkSession
>>>>>>>                               .builder()
>>>>>>>                               .appName("StreamProcessor")
>>>>>>>                               .config("spark.master", "local")
>>>>>>>                               .getOrCreate();
>>>>>>>
>>>>>>>           Dataset<Tweet> streams = spark
>>>>>>>                                       .readStream()
>>>>>>>                                       .format("kafka")
>>>>>>>                                       .option("kafka.bootstrap.servers",
>>>>>>> bootstrapServers)
>>>>>>>                                       .option(subscribeType, topics)
>>>>>>>                                       .load()
>>>>>>>                                       .as(Encoders.bean(Tweet.class)
>>>>>>> );
>>>>>>>
>>>>>>>          streams.createOrReplaceTempView("streamsData");
>>>>>>>
>>>>>>>                    String sql = "SELECT location,  COUNT(*) as count
>>>>>>> FROM streamsData
>>>>>>> GROUP BY location";
>>>>>>>                    Dataset<Row> countsByLocation = spark.sql(sql);
>>>>>>>
>>>>>>>                     StreamingQuery query =
>>>>>>> countsByLocation.writeStream()
>>>>>>>                       .outputMode("complete")
>>>>>>>                       .format("console")
>>>>>>>                       .start();
>>>>>>>
>>>>>>>                     query.awaitTermination();
>>>>>>> ------------------------------------------------------------
>>>>>>> --------------------------------------
>>>>>>>
>>>>>>> *Tweet *
>>>>>>>
>>>>>>> Tweet.java - has public constructor and getter / setter methods
>>>>>>>
>>>>>>> public class Tweet implements Serializable{
>>>>>>>
>>>>>>>         private String tweetId;
>>>>>>>         private String tweetText;
>>>>>>>         private String location;
>>>>>>>         private String timestamp;
>>>>>>>
>>>>>>>         public Tweet(){
>>>>>>>
>>>>>>>         }
>>>>>>> .............
>>>>>>>
>>>>>>> ------------------------------------------------------------
>>>>>>> ----------------------------
>>>>>>>
>>>>>>> *pom.xml *
>>>>>>>
>>>>>>>
>>>>>>>                 <dependency>
>>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>>                         <artifactId>spark-core_2.10</artifactId>
>>>>>>>                         <version>2.1.0</version>
>>>>>>>                 </dependency>
>>>>>>>                 <dependency>
>>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>>                         <artifactId>spark-streaming_2.
>>>>>>> 10</artifactId>
>>>>>>>                         <version>2.1.0</version>
>>>>>>>                 </dependency>
>>>>>>>                 <dependency>
>>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>>                         <artifactId>spark-streaming-ka
>>>>>>> fka-0-8_2.10</artifactId>
>>>>>>>                         <version>2.1.0</version>
>>>>>>>                 </dependency>
>>>>>>>                 <dependency>
>>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>>                         <artifactId>spark-sql_2.10</artifactId>
>>>>>>>                         <version>2.1.0</version>
>>>>>>>                 </dependency>
>>>>>>>                 <dependency>
>>>>>>>                 <groupId>org.apache.spark</groupId>
>>>>>>>                 <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
>>>>>>>                 <version>2.1.0</version>
>>>>>>>                 </dependency>
>>>>>>> ------------------------------------------------------------
>>>>>>> ------------------------
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context: http://apache-spark-user-list.
>>>>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> ------------------------------------------------------------
>>>>>>> ---------
>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to