Re: unable to stream kafka messages

2017-08-24 Thread cbowden
The exception is telling you precisely what is wrong. The kafka source has a schema of (topic, partition, offset, key, value, timestamp, timestampType). Nothing about those columns makes sense as a tweet. You need to inform spark how to get from bytes to tweet, it doesn't know how you serialized

Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
Ah, I understand what you are asking now. There is no API for specifying a kafka specific "decoder", since Spark SQL already has a rich language for expressing transformations. The dataframe code I gave will parse the JSON and materialize in a class, very similar to what

Re: unable to stream kafka messages

2017-03-27 Thread kaniska Mandal
yup, that solves the compilation issue :-) one quick question regarding specifying Decoder in kafka stream: please note that I am encoding the message as follows while sending data to kafka - *String msg = objectMapper.writeValueAsString(tweetEvent);* *return msg.getBytes();* I have a

Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
You need to import col from org.apache.spark.sql.functions. On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal wrote: > Hi Michael, > > Can you please check if I am using correct version of spark-streaming > library as specified in my pom (specified in the email) ? > >

Re: unable to stream kafka messages

2017-03-27 Thread kaniska Mandal
Hi Michael, Can you please check if I am using correct version of spark-streaming library as specified in my pom (specified in the email) ? col("value").cast("string") - throwing an error 'cannot find symbol method col(java.lang.String)' I tried $"value" which results into similar compilation

Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
Sorry, I don't think that I understand the question. Value is just a binary blob that we get from kafka and pass to you. If its stored in JSON, I think the code I provided is a good option, but if you are using a different encoding you may need to write a UDF. On Fri, Mar 24, 2017 at 4:58 PM,

Re: unable to stream kafka messages

2017-03-24 Thread kaniska Mandal
Hi Michael, Thanks much for the suggestion. I was wondering - whats the best way to deserialize the 'value' field On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust wrote: > Encoders can only map data into an object if those columns already exist. > When we are

Re: unable to stream kafka messages

2017-03-24 Thread Michael Armbrust
Encoders can only map data into an object if those columns already exist. When we are reading from Kafka, we just get a binary blob and you'll need to help Spark parse that first. Assuming your data is stored in JSON it should be pretty straight forward. streams = spark .readStream()