Re: Spark-shell doesn't see changes coming from Kafka topic

2016-12-05 Thread Otávio Carvalho
In the end, the mistake I made was that I forgot to setup the proper export
AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY on the machine I was running
the spark-shell.

Nevertheless, thanks for answering, Tathagata Das.

Otávio.

2016-12-01 17:36 GMT-02:00 Tathagata Das :

> Can you confirm the following?
> 1. Are you sending new data to the Kafka topic AFTER starting the
> streaming query? Since you have specified `*startingOffsets` *as*
> `latest`*, data needs to the topic after the query start for the query to
> receiver.
> 2. Are you able to read kafka data using Kafka's console consumer, from
> the same machine running the query? That would clear up any confusion
> regarding connectivity.
>
> If the above are cleared, I would look at INFO and DEBUG level log4j logs
> to see what the query is doing? is it stuck at some point or is it
> continuously running but not finding latest offsets?
>
>
> On Thu, Dec 1, 2016 at 6:31 AM, Otávio Carvalho  wrote:
>
>> Hello hivemind,
>>
>> I am trying to connect my Spark 2.0.2 cluster to an Apache Kafka 0.10
>> cluster via spark-shell.
>>
>> The connection works fine, but it is not able to receive the messages
>> published to the topic.
>>
>> It doesn't throw any error, but it is not able to retrieve any message (I
>> am sure that messages are being published 'cause I am able to read from the
>> topic from the same machine)
>>
>> Here follows the spark-shell code/output:
>>
>> *val ds1 = spark.readStream*
>> *.format("kafka")*
>> *.option("subscribe", "clickstream")*
>> *.option("kafka.bootstrap.servers",
>> "ec2-54-208-12-171.compute-1.amazonaws.com:9092
>> ")*
>> *.option("startingOffsets", "latest")*
>> *.load*
>>
>> *// Exiting paste mode, now interpreting.*
>>
>> *ds1: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5
>> more fields]*
>>
>> *scala> val counter = ds1.groupBy("value").count*
>> *counter: org.apache.spark.sql.DataFrame = [value: binary, count: bigint]*
>>
>> *scala> import org.apache.spark.sql.streaming.OutputMode.Complete*
>> *import org.apache.spark.sql.streaming.OutputMode.Complete*
>>
>> *val query = counter.writeStream*
>> *  .outputMode(Complete)*
>> *  .format("console")*
>> *  .start*
>>
>> *// Exiting paste mode, now interpreting.*
>>
>> *query: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query -
>> query-1 [state = ACTIVE]*
>>
>> *scala> query.status*
>> *res0: org.apache.spark.sql.streaming.StreamingQueryStatus =*
>> *Status of query 'query-1'*
>> *Query id: 1*
>> *Status timestamp: 1480602056895*
>> *Input rate: 0.0 rows/sec*
>> *Processing rate 0.0 rows/sec*
>> *Latency: - ms*
>> *Trigger details:*
>> *isTriggerActive: true*
>> *statusMessage: Finding new data from sources*
>> *timestamp.triggerStart: 1480602056894*
>> *triggerId: -1*
>> *Source statuses [1 source]:*
>> *Source 1 - KafkaSource[Subscribe[clickstream]]*
>> *Available offset: -*
>> *Input rate: 0.0 rows/sec*
>> *Processing rate: 0.0 rows/sec*
>> *Trigger details:*
>> *triggerId: -1*
>> *Sink status -
>> org.apache.spark.sql.execution.streaming.ConsoleSink@54d5b6cb*
>> *Committed offsets: [-]*
>>
>> I am starting the spark-shell as follows:
>> /root/spark/bin/spark-shell --packages org.apache.spark:spark-sql-kaf
>> ka-0-10_2.10:2.0.2
>>
>> Thanks,
>> Otávio Carvalho.
>>
>> --
>> Otávio Carvalho
>> Consultant Developer
>> Email ocarv...@thoughtworks.com
>> Telephone +55 53 91565742 <+55+53+91565742>
>> [image: ThoughtWorks]
>> 
>>
>
>


Re: Spark-shell doesn't see changes coming from Kafka topic

2016-12-01 Thread Tathagata Das
Can you confirm the following?
1. Are you sending new data to the Kafka topic AFTER starting the streaming
query? Since you have specified `*startingOffsets` *as* `latest`*, data
needs to the topic after the query start for the query to receiver.
2. Are you able to read kafka data using Kafka's console consumer, from the
same machine running the query? That would clear up any confusion regarding
connectivity.

If the above are cleared, I would look at INFO and DEBUG level log4j logs
to see what the query is doing? is it stuck at some point or is it
continuously running but not finding latest offsets?


On Thu, Dec 1, 2016 at 6:31 AM, Otávio Carvalho  wrote:

> Hello hivemind,
>
> I am trying to connect my Spark 2.0.2 cluster to an Apache Kafka 0.10
> cluster via spark-shell.
>
> The connection works fine, but it is not able to receive the messages
> published to the topic.
>
> It doesn't throw any error, but it is not able to retrieve any message (I
> am sure that messages are being published 'cause I am able to read from the
> topic from the same machine)
>
> Here follows the spark-shell code/output:
>
> *val ds1 = spark.readStream*
> *.format("kafka")*
> *.option("subscribe", "clickstream")*
> *.option("kafka.bootstrap.servers",
> "ec2-54-208-12-171.compute-1.amazonaws.com:9092
> ")*
> *.option("startingOffsets", "latest")*
> *.load*
>
> *// Exiting paste mode, now interpreting.*
>
> *ds1: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5
> more fields]*
>
> *scala> val counter = ds1.groupBy("value").count*
> *counter: org.apache.spark.sql.DataFrame = [value: binary, count: bigint]*
>
> *scala> import org.apache.spark.sql.streaming.OutputMode.Complete*
> *import org.apache.spark.sql.streaming.OutputMode.Complete*
>
> *val query = counter.writeStream*
> *  .outputMode(Complete)*
> *  .format("console")*
> *  .start*
>
> *// Exiting paste mode, now interpreting.*
>
> *query: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query -
> query-1 [state = ACTIVE]*
>
> *scala> query.status*
> *res0: org.apache.spark.sql.streaming.StreamingQueryStatus =*
> *Status of query 'query-1'*
> *Query id: 1*
> *Status timestamp: 1480602056895*
> *Input rate: 0.0 rows/sec*
> *Processing rate 0.0 rows/sec*
> *Latency: - ms*
> *Trigger details:*
> *isTriggerActive: true*
> *statusMessage: Finding new data from sources*
> *timestamp.triggerStart: 1480602056894*
> *triggerId: -1*
> *Source statuses [1 source]:*
> *Source 1 - KafkaSource[Subscribe[clickstream]]*
> *Available offset: -*
> *Input rate: 0.0 rows/sec*
> *Processing rate: 0.0 rows/sec*
> *Trigger details:*
> *triggerId: -1*
> *Sink status -
> org.apache.spark.sql.execution.streaming.ConsoleSink@54d5b6cb*
> *Committed offsets: [-]*
>
> I am starting the spark-shell as follows:
> /root/spark/bin/spark-shell --packages org.apache.spark:spark-sql-
> kafka-0-10_2.10:2.0.2
>
> Thanks,
> Otávio Carvalho.
>
> --
> Otávio Carvalho
> Consultant Developer
> Email ocarv...@thoughtworks.com
> Telephone +55 53 91565742 <+55+53+91565742>
> [image: ThoughtWorks]
> 
>


Spark-shell doesn't see changes coming from Kafka topic

2016-12-01 Thread Otávio Carvalho
Hello hivemind,

I am trying to connect my Spark 2.0.2 cluster to an Apache Kafka 0.10
cluster via spark-shell.

The connection works fine, but it is not able to receive the messages
published to the topic.

It doesn't throw any error, but it is not able to retrieve any message (I
am sure that messages are being published 'cause I am able to read from the
topic from the same machine)

Here follows the spark-shell code/output:

*val ds1 = spark.readStream*
*.format("kafka")*
*.option("subscribe", "clickstream")*
*.option("kafka.bootstrap.servers",
"ec2-54-208-12-171.compute-1.amazonaws.com:9092
")*
*.option("startingOffsets", "latest")*
*.load*

*// Exiting paste mode, now interpreting.*

*ds1: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5
more fields]*

*scala> val counter = ds1.groupBy("value").count*
*counter: org.apache.spark.sql.DataFrame = [value: binary, count: bigint]*

*scala> import org.apache.spark.sql.streaming.OutputMode.Complete*
*import org.apache.spark.sql.streaming.OutputMode.Complete*

*val query = counter.writeStream*
*  .outputMode(Complete)*
*  .format("console")*
*  .start*

*// Exiting paste mode, now interpreting.*

*query: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query -
query-1 [state = ACTIVE]*

*scala> query.status*
*res0: org.apache.spark.sql.streaming.StreamingQueryStatus =*
*Status of query 'query-1'*
*Query id: 1*
*Status timestamp: 1480602056895*
*Input rate: 0.0 rows/sec*
*Processing rate 0.0 rows/sec*
*Latency: - ms*
*Trigger details:*
*isTriggerActive: true*
*statusMessage: Finding new data from sources*
*timestamp.triggerStart: 1480602056894*
*triggerId: -1*
*Source statuses [1 source]:*
*Source 1 - KafkaSource[Subscribe[clickstream]]*
*Available offset: -*
*Input rate: 0.0 rows/sec*
*Processing rate: 0.0 rows/sec*
*Trigger details:*
*triggerId: -1*
*Sink status -
org.apache.spark.sql.execution.streaming.ConsoleSink@54d5b6cb*
*Committed offsets: [-]*

I am starting the spark-shell as follows:
/root/spark/bin/spark-shell --packages
org.apache.spark:spark-sql-kafka-0-10_2.10:2.0.2

Thanks,
Otávio Carvalho.

-- 
Otávio Carvalho
Consultant Developer
Email ocarv...@thoughtworks.com
Telephone +55 53 91565742 <+55+53+91565742>
[image: ThoughtWorks]