The exception is telling you precisely what is wrong. The kafka source has a
schema of (topic, partition, offset, key, value, timestamp, timestampType).
Nothing about those columns makes sense as a tweet. You need to inform spark
how to get from bytes to tweet, it doesn't know how you serialized
Ah, I understand what you are asking now. There is no API for specifying a
kafka specific "decoder", since Spark SQL already has a rich language for
expressing transformations. The dataframe code I gave will parse the JSON
and materialize in a class, very similar to what
yup, that solves the compilation issue :-)
one quick question regarding specifying Decoder in kafka stream:
please note that I am encoding the message as follows while sending data to
kafka -
*String msg = objectMapper.writeValueAsString(tweetEvent);*
*return msg.getBytes();*
I have a
You need to import col from org.apache.spark.sql.functions.
On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal
wrote:
> Hi Michael,
>
> Can you please check if I am using correct version of spark-streaming
> library as specified in my pom (specified in the email) ?
>
>
Hi Michael,
Can you please check if I am using correct version of spark-streaming
library as specified in my pom (specified in the email) ?
col("value").cast("string") - throwing an error 'cannot find symbol method
col(java.lang.String)'
I tried $"value" which results into similar compilation
Sorry, I don't think that I understand the question. Value is just a
binary blob that we get from kafka and pass to you. If its stored in JSON,
I think the code I provided is a good option, but if you are using a
different encoding you may need to write a UDF.
On Fri, Mar 24, 2017 at 4:58 PM,
Hi Michael,
Thanks much for the suggestion.
I was wondering - whats the best way to deserialize the 'value' field
On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust
wrote:
> Encoders can only map data into an object if those columns already exist.
> When we are
Encoders can only map data into an object if those columns already exist.
When we are reading from Kafka, we just get a binary blob and you'll need
to help Spark parse that first. Assuming your data is stored in JSON it
should be pretty straight forward.
streams = spark
.readStream()