Re: unable to stream kafka messages

2017-08-24 Thread cbowden
The exception is telling you precisely what is wrong. The kafka source has a
schema of (topic, partition, offset, key, value, timestamp, timestampType).
Nothing about those columns makes sense as a tweet. You need to inform spark
how to get from bytes to tweet, it doesn't know how you serialized the
messages into kafka.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537p29107.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
g this class
>>>>>>
>>>>>> Here is some more info on working with JSON and other
>>>>>> semi-structured formats
>>>>>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>>>>>> .
>>>>>>
>>>>>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Currently , encountering the following exception while working with
>>>>>>> below-mentioned code snippet :
>>>>>>>
>>>>>>> > Please suggest the correct approach for reading the stream into a
>>>>>>> sql
>>>>>>> > schema.
>>>>>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>>>>>> message -
>>>>>>> > we can not change static schema for kafka.
>>>>>>>
>>>>>>> 
>>>>>>> ---
>>>>>>>
>>>>>>> *exception*
>>>>>>>
>>>>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>>>>>> '`location`' given input columns: [topic, timestamp, key, offset,
>>>>>>> value,
>>>>>>> timestampType, partition]*;
>>>>>>> at
>>>>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>>>>>> At.failAnalysis(package.scala:42)
>>>>>>> at
>>>>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>>>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis
>>>>>>> .scala:77)
>>>>>>> 
>>>>>>> 
>>>>>>>
>>>>>>> *structured streaming code snippet*
>>>>>>>
>>>>>>> String bootstrapServers = "localhost:9092";
>>>>>>> String subscribeType = "subscribe";
>>>>>>> String topics = "events";
>>>>>>>
>>>>>>> StructType tweetSchema = new StructType()
>>>>>>> .add("tweetId", "string")
>>>>>>> .add("tweetText", "string")
>>>>>>> .add("location", "string")
>>>>>>> .add("timestamp", "string");
>>>>>>>
>>>>>>>SparkSession spark = SparkSession
>>>>>>>   .builder()
>>>>>>>   .appName("StreamProcessor")
>>>>>>>   .config("spark.master", "local")
>>>>>>>   .getOrCreate();
>>>>>>>
>>>>>>>   Dataset streams = spark
>>>>>>>   .readStream()
>>>>>>>   .format("kafka")
>>>>>>>   .option("kafka.bootstrap.servers",
>>>>>>> bootstrapServers)
>>>>>>>   .option(subscribeType, topics)
>>>>>>>   .load()
>>>>>>>   .as(Encoders.bean(Tweet.class)
>>>>>>> );
>>>>>>>
>>>>>>>  streams.createOrReplaceTempView("streamsData");
>>>>>>>
>>>>>>>String sql = "SELECT location,  COUNT(*) as count
>>>>>>> FROM streamsData
>>>>>>> GROUP BY location";
>>>>>>>Dataset countsByLocation = spark.sql(sql);
>>>>>>>
>>>>>>> StreamingQuery query =
>>>>>>> countsByLocation.writeStream()
>>>>>>>   .outputMode("complete")
>>>>>>>   .format("console")
>>>>>>>   .start();
>>>>>>>
>>>>>>> query.awaitTermination();
>>>>>>> 
>>>>>>> --
>>>>>>>
>>>>>>> *Tweet *
>>>>>>>
>>>>>>> Tweet.java - has public constructor and getter / setter methods
>>>>>>>
>>>>>>> public class Tweet implements Serializable{
>>>>>>>
>>>>>>> private String tweetId;
>>>>>>> private String tweetText;
>>>>>>> private String location;
>>>>>>> private String timestamp;
>>>>>>>
>>>>>>> public Tweet(){
>>>>>>>
>>>>>>> }
>>>>>>> .
>>>>>>>
>>>>>>> 
>>>>>>> 
>>>>>>>
>>>>>>> *pom.xml *
>>>>>>>
>>>>>>>
>>>>>>> 
>>>>>>> org.apache.spark
>>>>>>> spark-core_2.10
>>>>>>> 2.1.0
>>>>>>> 
>>>>>>> 
>>>>>>> org.apache.spark
>>>>>>> spark-streaming_2.
>>>>>>> 10
>>>>>>> 2.1.0
>>>>>>> 
>>>>>>> 
>>>>>>> org.apache.spark
>>>>>>> spark-streaming-ka
>>>>>>> fka-0-8_2.10
>>>>>>> 2.1.0
>>>>>>> 
>>>>>>> 
>>>>>>> org.apache.spark
>>>>>>> spark-sql_2.10
>>>>>>> 2.1.0
>>>>>>> 
>>>>>>> 
>>>>>>> org.apache.spark
>>>>>>> spark-sql-kafka-0-10_2.10
>>>>>>> 2.1.0
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context: http://apache-spark-user-list.
>>>>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> 
>>>>>>> -
>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: unable to stream kafka messages

2017-03-27 Thread kaniska Mandal
gt;>> *exception*
>>>>>>
>>>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>>>>> '`location`' given input columns: [topic, timestamp, key, offset,
>>>>>> value,
>>>>>> timestampType, partition]*;
>>>>>> at
>>>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>>>>> At.failAnalysis(package.scala:42)
>>>>>> at
>>>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis
>>>>>> .scala:77)
>>>>>> 
>>>>>> 
>>>>>>
>>>>>> *structured streaming code snippet*
>>>>>>
>>>>>> String bootstrapServers = "localhost:9092";
>>>>>> String subscribeType = "subscribe";
>>>>>> String topics = "events";
>>>>>>
>>>>>> StructType tweetSchema = new StructType()
>>>>>> .add("tweetId", "string")
>>>>>> .add("tweetText", "string")
>>>>>> .add("location", "string")
>>>>>> .add("timestamp", "string");
>>>>>>
>>>>>>SparkSession spark = SparkSession
>>>>>>   .builder()
>>>>>>   .appName("StreamProcessor")
>>>>>>   .config("spark.master", "local")
>>>>>>   .getOrCreate();
>>>>>>
>>>>>>   Dataset streams = spark
>>>>>>   .readStream()
>>>>>>   .format("kafka")
>>>>>>   .option("kafka.bootstrap.servers",
>>>>>> bootstrapServers)
>>>>>>   .option(subscribeType, topics)
>>>>>>   .load()
>>>>>>   .as(Encoders.bean(Tweet.class)
>>>>>> );
>>>>>>
>>>>>>  streams.createOrReplaceTempView("streamsData");
>>>>>>
>>>>>>String sql = "SELECT location,  COUNT(*) as count
>>>>>> FROM streamsData
>>>>>> GROUP BY location";
>>>>>>Dataset countsByLocation = spark.sql(sql);
>>>>>>
>>>>>> StreamingQuery query =
>>>>>> countsByLocation.writeStream()
>>>>>>   .outputMode("complete")
>>>>>>   .format("console")
>>>>>>   .start();
>>>>>>
>>>>>> query.awaitTermination();
>>>>>> 
>>>>>> --
>>>>>>
>>>>>> *Tweet *
>>>>>>
>>>>>> Tweet.java - has public constructor and getter / setter methods
>>>>>>
>>>>>> public class Tweet implements Serializable{
>>>>>>
>>>>>> private String tweetId;
>>>>>> private String tweetText;
>>>>>> private String location;
>>>>>> private String timestamp;
>>>>>>
>>>>>> public Tweet(){
>>>>>>
>>>>>> }
>>>>>> .
>>>>>>
>>>>>> 
>>>>>> 
>>>>>>
>>>>>> *pom.xml *
>>>>>>
>>>>>>
>>>>>> 
>>>>>> org.apache.spark
>>>>>> spark-core_2.10
>>>>>> 2.1.0
>>>>>> 
>>>>>> 
>>>>>> org.apache.spark
>>>>>> spark-streaming_2.10
>>>>>> 2.1.0
>>>>>> 
>>>>>> 
>>>>>> org.apache.spark
>>>>>> spark-streaming-ka
>>>>>> fka-0-8_2.10
>>>>>> 2.1.0
>>>>>> 
>>>>>> 
>>>>>> org.apache.spark
>>>>>> spark-sql_2.10
>>>>>> 2.1.0
>>>>>> 
>>>>>> 
>>>>>> org.apache.spark
>>>>>> spark-sql-kafka-0-10_2.10
>>>>>> 2.1.0
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context: http://apache-spark-user-list.
>>>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> -
>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
  .add("tweetText", "string")
>>>>> .add("location", "string")
>>>>> .add("timestamp", "string");
>>>>>
>>>>>SparkSession spark = SparkSession
>>>>>   .builder()
>>>>>   .appName("StreamProcessor")
>>>>>   .config("spark.master", "local")
>>>>>   .getOrCreate();
>>>>>
>>>>>   Dataset streams = spark
>>>>>   .readStream()
>>>>>   .format("kafka")
>>>>>   .option("kafka.bootstrap.servers",
>>>>> bootstrapServers)
>>>>>   .option(subscribeType, topics)
>>>>>   .load()
>>>>>   .as(Encoders.bean(Tweet.class));
>>>>>
>>>>>  streams.createOrReplaceTempView("streamsData");
>>>>>
>>>>>String sql = "SELECT location,  COUNT(*) as count
>>>>> FROM streamsData
>>>>> GROUP BY location";
>>>>>Dataset countsByLocation = spark.sql(sql);
>>>>>
>>>>> StreamingQuery query =
>>>>> countsByLocation.writeStream()
>>>>>   .outputMode("complete")
>>>>>   .format("console")
>>>>>       .start();
>>>>>
>>>>> query.awaitTermination();
>>>>> 
>>>>> --
>>>>>
>>>>> *Tweet *
>>>>>
>>>>> Tweet.java - has public constructor and getter / setter methods
>>>>>
>>>>> public class Tweet implements Serializable{
>>>>>
>>>>> private String tweetId;
>>>>> private String tweetText;
>>>>> private String location;
>>>>> private String timestamp;
>>>>>
>>>>> public Tweet(){
>>>>>
>>>>> }
>>>>> .
>>>>>
>>>>> 
>>>>> 
>>>>>
>>>>> *pom.xml *
>>>>>
>>>>>
>>>>> 
>>>>> org.apache.spark
>>>>> spark-core_2.10
>>>>> 2.1.0
>>>>> 
>>>>> 
>>>>> org.apache.spark
>>>>> spark-streaming_2.10
>>>>> 2.1.0
>>>>> 
>>>>> 
>>>>> org.apache.spark
>>>>> spark-streaming-ka
>>>>> fka-0-8_2.10
>>>>> 2.1.0
>>>>> 
>>>>> 
>>>>> org.apache.spark
>>>>> spark-sql_2.10
>>>>> 2.1.0
>>>>> 
>>>>> 
>>>>> org.apache.spark
>>>>> spark-sql-kafka-0-10_2.10
>>>>> 2.1.0
>>>>> 
>>>>> 
>>>>> 
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context: http://apache-spark-user-list.
>>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> -
>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: unable to stream kafka messages

2017-03-27 Thread kaniska Mandal
;>>
>>>>   Dataset streams = spark
>>>>   .readStream()
>>>>   .format("kafka")
>>>>   .option("kafka.bootstrap.servers",
>>>> bootstrapServers)
>>>>   .option(subscribeType, topics)
>>>>   .load()
>>>>   .as(Encoders.bean(Tweet.class));
>>>>
>>>>  streams.createOrReplaceTempView("streamsData");
>>>>
>>>>String sql = "SELECT location,  COUNT(*) as count
>>>> FROM streamsData
>>>> GROUP BY location";
>>>>Dataset countsByLocation = spark.sql(sql);
>>>>
>>>> StreamingQuery query =
>>>> countsByLocation.writeStream()
>>>>   .outputMode("complete")
>>>>   .format("console")
>>>>   .start();
>>>>
>>>> query.awaitTermination();
>>>> 
>>>> --
>>>>
>>>> *Tweet *
>>>>
>>>> Tweet.java - has public constructor and getter / setter methods
>>>>
>>>> public class Tweet implements Serializable{
>>>>
>>>> private String tweetId;
>>>> private String tweetText;
>>>> private String location;
>>>> private String timestamp;
>>>>
>>>> public Tweet(){
>>>>
>>>> }
>>>> .
>>>>
>>>> 
>>>> 
>>>>
>>>> *pom.xml *
>>>>
>>>>
>>>> 
>>>> org.apache.spark
>>>> spark-core_2.10
>>>> 2.1.0
>>>> 
>>>> 
>>>> org.apache.spark
>>>> spark-streaming_2.10
>>>> 2.1.0
>>>> 
>>>> 
>>>> org.apache.spark
>>>> spark-streaming-ka
>>>> fka-0-8_2.10
>>>> 2.1.0
>>>> 
>>>> 
>>>> org.apache.spark
>>>> spark-sql_2.10
>>>> 2.1.0
>>>> 
>>>> 
>>>> org.apache.spark
>>>> spark-sql-kafka-0-10_2.10
>>>> 2.1.0
>>>> 
>>>> 
>>>> 
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-spark-user-list.
>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
 count
>>> FROM streamsData
>>> GROUP BY location";
>>>Dataset countsByLocation = spark.sql(sql);
>>>
>>> StreamingQuery query = countsByLocation.writeStream()
>>>   .outputMode("complete")
>>>   .format("console")
>>>   .start();
>>>
>>> query.awaitTermination();
>>> 
>>> --
>>>
>>> *Tweet *
>>>
>>> Tweet.java - has public constructor and getter / setter methods
>>>
>>> public class Tweet implements Serializable{
>>>
>>> private String tweetId;
>>> private String tweetText;
>>> private String location;
>>> private String timestamp;
>>>
>>>     public Tweet(){
>>>
>>> }
>>> .
>>>
>>> 
>>> 
>>>
>>> *pom.xml *
>>>
>>>
>>> 
>>> org.apache.spark
>>> spark-core_2.10
>>> 2.1.0
>>> 
>>> 
>>> org.apache.spark
>>> spark-streaming_2.10
>>> 2.1.0
>>> 
>>> 
>>> org.apache.spark
>>> spark-streaming-ka
>>> fka-0-8_2.10
>>> 2.1.0
>>> 
>>> 
>>> org.apache.spark
>>> spark-sql_2.10
>>> 2.1.0
>>> 
>>> 
>>> org.apache.spark
>>> spark-sql-kafka-0-10_2.10
>>> 2.1.0
>>> 
>>> 
>>> 
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: unable to stream kafka messages

2017-03-24 Thread kaniska Mandal
ble{
>>
>> private String tweetId;
>> private String tweetText;
>> private String location;
>> private String timestamp;
>>
>> public Tweet(){
>>
>> }
>> .
>>
>> 
>> 
>>
>> *pom.xml *
>>
>>
>> 
>> org.apache.spark
>>     spark-core_2.10
>> 2.1.0
>> 
>> 
>> org.apache.spark
>> spark-streaming_2.10
>> 2.1.0
>> 
>> 
>> org.apache.spark
>> spark-streaming-ka
>> fka-0-8_2.10
>> 2.1.0
>> 
>> 
>> org.apache.spark
>> spark-sql_2.10
>> 2.1.0
>> 
>> 
>> org.apache.spark
>> spark-sql-kafka-0-10_2.10
>> 2.1.0
>> 
>> 
>> 
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: unable to stream kafka messages

2017-03-24 Thread Michael Armbrust
 org.apache.spark
> spark-streaming_2.10
> 2.1.0
> 
> 
> org.apache.spark
>         spark-streaming-
> kafka-0-8_2.10
> 2.1.0
> 
> 
> org.apache.spark
> spark-sql_2.10
> 2.1.0
> 
> 
> org.apache.spark
> spark-sql-kafka-0-10_2.10
> 2.1.0
> 
> 
> 
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


unable to stream kafka messages

2017-03-24 Thread kaniska
Hi,

Currently , encountering the following exception while working with
below-mentioned code snippet :

> Please suggest the correct approach for reading the stream into a sql
> schema.
> If I add 'tweetSchema' while reading stream, it errors out with message -
> we can not change static schema for kafka.

---

*exception*

Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
'`location`' given input columns: [topic, timestamp, key, offset, value,
timestampType, partition]*;
at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)


*structured streaming code snippet*

String bootstrapServers = "localhost:9092";
String subscribeType = "subscribe";
String topics = "events";

StructType tweetSchema = new StructType()
.add("tweetId", "string")
.add("tweetText", "string")
.add("location", "string")
.add("timestamp", "string");

   SparkSession spark = SparkSession
  .builder()
  .appName("StreamProcessor")
  .config("spark.master", "local")
  .getOrCreate();

  Dataset streams = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", 
bootstrapServers)
  .option(subscribeType, topics)
  .load()
  .as(Encoders.bean(Tweet.class));

 streams.createOrReplaceTempView("streamsData");
   
   String sql = "SELECT location,  COUNT(*) as count FROM 
streamsData
GROUP BY location";
   Dataset countsByLocation = spark.sql(sql);

StreamingQuery query = countsByLocation.writeStream()
  .outputMode("complete")
  .format("console")
  .start();

query.awaitTermination();
--

*Tweet *

Tweet.java - has public constructor and getter / setter methods

public class Tweet implements Serializable{

private String tweetId;
private String tweetText;
private String location;
private String timestamp;

public Tweet(){

}
.  



*pom.xml *



org.apache.spark
spark-core_2.10
2.1.0


org.apache.spark
spark-streaming_2.10
2.1.0


org.apache.spark
spark-streaming-kafka-0-8_2.10
2.1.0


org.apache.spark
spark-sql_2.10
2.1.0


org.apache.spark
spark-sql-kafka-0-10_2.10
    2.1.0
    




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org