Re: Kafka to spark streaming

2022-01-30 Thread Gourav Sengupta
Hi Amit,

before answering your question, I am just trying to understand it.

I am not exactly clear how do the Akka application, Kafka and SPARK
Streaming application sit together, and what are you exactly trying to
achieve?

Can you please elaborate?

Regards,
Gourav


On Fri, Jan 28, 2022 at 10:14 PM Amit Sharma  wrote:

> Hello everyone, we have spark streaming application. We send request to
> stream through Akka actor using Kafka topic. We wait for response as it is
> real time. Just want a suggestion is there any better option like Livy
> where we can send and receive request to spark streaming.
>
>
> Thanks
> Amit
>


Re: Kafka to spark streaming

2022-01-29 Thread Amit Sharma
Thanks Mich. The link you shared have two options Kafka and Socket only.


Thanks
Amit

On Sat, Jan 29, 2022 at 3:49 AM Mich Talebzadeh 
wrote:

> So you have a classic architecture with spark receiving events through a
> kafka topic via kafka-spark-connector, do something with it and send data
> out to the consumer. Are you using Spark structured streaming here with
> batch streaming? check
>
>
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#structured-streaming-programming-guide
>
> HTH
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 28 Jan 2022 at 22:14, Amit Sharma  wrote:
>
>> Hello everyone, we have spark streaming application. We send request to
>> stream through Akka actor using Kafka topic. We wait for response as it is
>> real time. Just want a suggestion is there any better option like Livy
>> where we can send and receive request to spark streaming.
>>
>>
>> Thanks
>> Amit
>>
>


Re: Kafka to spark streaming

2022-01-29 Thread Mich Talebzadeh
So you have a classic architecture with spark receiving events through a
kafka topic via kafka-spark-connector, do something with it and send data
out to the consumer. Are you using Spark structured streaming here with
batch streaming? check

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#structured-streaming-programming-guide

HTH

   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 28 Jan 2022 at 22:14, Amit Sharma  wrote:

> Hello everyone, we have spark streaming application. We send request to
> stream through Akka actor using Kafka topic. We wait for response as it is
> real time. Just want a suggestion is there any better option like Livy
> where we can send and receive request to spark streaming.
>
>
> Thanks
> Amit
>


Re: Kafka with Spark Streaming work on local but it doesn't work in Standalone mode

2020-07-24 Thread Gabor Somogyi
Hi Davide,

Please see the doc:
*Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.*

Have you tried the same with Structured Streaming and not with DStreams?
If you insist somehow to DStreams you can use spark-streaming-kafka-0-10
connector instead.

BR,
G


On Fri, Jul 24, 2020 at 12:08 PM Davide Curcio 
wrote:

> Hi,
>
> I'm trying to use Spark Streaming with a very simple script like this:
>
> from pyspark import SparkContext, SparkConf
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.kafka import KafkaUtils
>
>
> sc = SparkContext(appName="PythonSparkStreamingKafka")
>
>
> ssc = StreamingContext(sc, 1)
> kafkaParams = {"metadata.broker.list": "172.31.71.104:9092",
>"auto.offset.reset": "smallest"}
>
> training = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)
>
> training.pprint()
>
> ssc.start()
> ssc.awaitTermination()
>
> But although locally it works, with the cluster using Standalone mode it
> crashes. I have a cluster with 4 machines:
>
> 1 machine with Kafka Producer, 1 Broker and 1 Zookeeper
> 1 machine is the driver
> 2 machines are the workers.
>
> The strange thing is that when I had Kafka Producer, Broker and Zookeeper
> in the same machine in which I have the driver, it worked both locally and
> in the cluster. But obviously for the sake of scalability and modularity
> I'd like to use the current configuration.
>
> I'm using Spark 2.4.6, the Kafka Streaming API are
> "spark-streaming-kafka-0-8-assembly_2.11-2.4.6" and the Kafka version that
> I'm currently using is kafka_2.11-2.4.1
>
> The result is the following:
>
> 020-07-24 09:48:25,869 WARN scheduler.TaskSetManager: Lost task 0.0 in
> stage 0.0 (TID 0, 172.31.69.185, executor 0):
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
> at
> org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
>
> 2020-07-24 09:48:25,875 INFO scheduler.TaskSetManager: Starting task 0.1
> in stage 0.0 (TID 1, 172.31.69.185, executor 0, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:25,950 INFO scheduler.TaskSetManager: Lost task 0.1 in
> stage 0.0 (TID 1) on 172.31.69.185, executor 0:
> java.nio.channels.ClosedChannelException (null) [duplicate 1]
> 2020-07-24 09:48:25,952 INFO scheduler.TaskSetManager: Starting task 0.2
> in stage 0.0 (TID 2, 172.31.69.185, executor 0, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:25,984 INFO scheduler.TaskSetManager: Lost task 0.2 in
> stage 0.0 (TID 2) on 172.31.69.185, executor 0:
> java.nio.channels.ClosedChannelException (null) [duplicate 2]
> 2020-07-24 09:48:25,985 INFO scheduler.TaskSetManager: Starting task 0.3
> in stage 0.0 (TID 3, 172.31.79.221, executor 1, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:26,026 INFO scheduler.JobScheduler: Added jobs for time
> 1595584106000 ms
> 2020-07-24 09:48:26,375 INFO storage.BlockManagerInfo: Added
> broadcast_0_piece0 in memory on 172.31.79.221:44371 (size: 4.0 KB, free:
> 366.3 MB)
> 2020-07-24 09:48:27,022 INFO scheduler.JobScheduler: Added jobs for time
> 1595584107000 ms
> 2020-07-24 09:48:27,165 INFO scheduler.TaskSetManager: Lost task 0.3 in
> stage 0.0 (TID 3) on 172.31.79.221, executor 1:
> 

Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Jacek Laskowski
Hi,

What's the entire spark-submit + Spark properties you're using?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Fri, Dec 2, 2016 at 6:28 PM, Gabriel Perez <gabr...@adtheorent.com> wrote:
> I had it setup with three nodes, a master and 2 slaves. Is there anything
> that would tell me it was in local mode. I am also added the –deploy-mode
> cluster flag and saw the same results.
>
>
>
> Thanks,
>
> Gabe
>
>
>
> From: Mich Talebzadeh <mich.talebza...@gmail.com>
> Date: Friday, December 2, 2016 at 12:26 PM
> To: Gabriel Perez <gabr...@adtheorent.com>
> Cc: Jacek Laskowski <ja...@japila.pl>, user <user@spark.apache.org>
>
>
> Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2
>
>
>
> in this POC of yours are you running this app with spark  in Local mode by
> any chance?
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed. The
> author will in no case be liable for any monetary damages arising from such
> loss, damage or destruction.
>
>
>
>
>
> On 2 December 2016 at 16:54, Gabriel Perez <gabr...@adtheorent.com> wrote:
>
> Hi,
>
>
>
> The total partitions are 128 and I can tell its one executor because in the
> consumer list for kafka I see only one thread pulling and in the master
> spark UI I see the executor thread id is showing as 0 and that’s it.
>
>
>
> Thanks,
>
> Gabe
>
>
>
>
>
> From: Jacek Laskowski <ja...@japila.pl>
> Date: Friday, December 2, 2016 at 11:47 AM
> To: Gabriel Perez <gabr...@adtheorent.com>
> Cc: user <user@spark.apache.org>
> Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2
>
>
>
> Hi,
>
>
>
> How many partitions does the topic have? How do you check how many executors
> read from the topic?
>
>
>
> Jacek
>
>
>
>
>
> On 2 Dec 2016 2:44 p.m., "gabrielperez2484" <gabr...@adtheorent.com> wrote:
>
> Hello,
>
> I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
> am running into an issue, where only one executor ("kafka consumer") is
> reading from the topic. Which is causing performance to be really poor. I
> have tried adding "--num-executors 8" both in the script to execute the jar
> and in my java code. Here is the code below. Please let me know if I am
> missing something or there is a way to increase the number of consumers to
> connect to kafka.
>
>
> Thanks,
> Gabe
>
> 
> Map<String, Object> kafkaParams = new HashMap<>();
> kafkaParams.put( "bootstrap.servers", "server:9092" );
> kafkaParams.put( "key.deserializer",
> StringDeserializer.class );
> kafkaParams.put( "value.deserializer",
> StringDeserializer.class );
> kafkaParams.put( "group.id", "spark-aggregation" );
> kafkaParams.put( "auto.offset.reset", "earliest" );
> kafkaParams.put( "request.timeout.ms", "305000" );
> kafkaParams.put( "heartbeat.interval.ms", "85000" );
> kafkaParams.put( "session.timeout.ms", "9" );
>
> Collection topics = Arrays.asList( "Topic" );
>
> SparkConf sparkConf = new SparkConf().setMaster(
> "spark://server:7077" )
> .setAppName( "aggregation" ).set(
> "spark.submit.deployMode", "cluster" )
> .set( "spark.executor.instances", "16" );
>
> JavaStreamingContext javaStreamingContext = new
> JavaStreamingContext(
> sparkConf, new Duration( 5000 ) );
>
> //Creates connect to the Stream.
> final JavaInputDStream<ConsumerRecordString, String>>
> stream =
> KafkaUtils.createDirectStream(
> javaStreamingContext,
> LocationStrategies.PreferConsistent(),
> ConsumerStrategies.

Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Gabriel Perez
I had it setup with three nodes, a master and 2 slaves. Is there anything that 
would tell me it was in local mode. I am also added the –deploy-mode cluster 
flag and saw the same results.

Thanks,
Gabe

From: Mich Talebzadeh <mich.talebza...@gmail.com>
Date: Friday, December 2, 2016 at 12:26 PM
To: Gabriel Perez <gabr...@adtheorent.com>
Cc: Jacek Laskowski <ja...@japila.pl>, user <user@spark.apache.org>
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

in this POC of yours are you running this app with spark  in Local mode by any 
chance?


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 2 December 2016 at 16:54, Gabriel Perez 
<gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote:
Hi,

The total partitions are 128 and I can tell its one executor because in the 
consumer list for kafka I see only one thread pulling and in the master spark 
UI I see the executor thread id is showing as 0 and that’s it.

Thanks,
Gabe


From: Jacek Laskowski <ja...@japila.pl<mailto:ja...@japila.pl>>
Date: Friday, December 2, 2016 at 11:47 AM
To: Gabriel Perez <gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>>
Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

Hi,

How many partitions does the topic have? How do you check how many executors 
read from the topic?

Jacek


On 2 Dec 2016 2:44 p.m., "gabrielperez2484" 
<gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote:
Hello,

I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
am running into an issue, where only one executor ("kafka consumer") is
reading from the topic. Which is causing performance to be really poor. I
have tried adding "--num-executors 8" both in the script to execute the jar
and in my java code. Here is the code below. Please let me know if I am
missing something or there is a way to increase the number of consumers to
connect to kafka.


Thanks,
Gabe


Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put( "bootstrap.servers", "server:9092" );
kafkaParams.put( "key.deserializer", StringDeserializer.class );
kafkaParams.put( "value.deserializer", StringDeserializer.class 
);
kafkaParams.put( "group.id<http://group.id>", 
"spark-aggregation" );
kafkaParams.put( "auto.offset.reset", "earliest" );
kafkaParams.put( 
"request.timeout.ms<http://request.timeout.ms>", "305000" );
kafkaParams.put( 
"heartbeat.interval.ms<http://heartbeat.interval.ms>", "85000" );
kafkaParams.put( 
"session.timeout.ms<http://session.timeout.ms>", "9" );

Collection topics = Arrays.asList( "Topic" );

SparkConf sparkConf = new SparkConf().setMaster( 
"spark://server:7077" )
.setAppName( "aggregation" ).set( 
"spark.submit.deployMode", "cluster" )
.set( "spark.executor.instances", "16" );

JavaStreamingContext javaStreamingContext = new 
JavaStreamingContext(
sparkConf, new Duration( 5000 ) );

//Creates connect to the Stream.
final JavaInputDStream<ConsumerRecordString, String>> 
stream =
KafkaUtils.createDirectStream(
javaStreamingContext, 
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe( 
topics, kafkaParams ) );

//JavaPairDStream<String, String> unifiedStream =
javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

JavaDStream records = stream.map( new
Function<ConsumerRecordString, String>, String>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Pulling key from the stream and creating the 
aggregation key.
 */
public String call( ConsumerRecord<String, String> 
record ) {


  

Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Mich Talebzadeh
in this POC of yours are you running this app with spark  in Local mode by
any chance?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 2 December 2016 at 16:54, Gabriel Perez <gabr...@adtheorent.com> wrote:

> Hi,
>
>
>
> The total partitions are 128 and I can tell its one executor because in
> the consumer list for kafka I see only one thread pulling and in the master
> spark UI I see the executor thread id is showing as 0 and that’s it.
>
>
>
> Thanks,
>
> Gabe
>
>
>
>
>
> *From: *Jacek Laskowski <ja...@japila.pl>
> *Date: *Friday, December 2, 2016 at 11:47 AM
> *To: *Gabriel Perez <gabr...@adtheorent.com>
> *Cc: *user <user@spark.apache.org>
> *Subject: *Re: Kafka 0.10 & Spark Streaming 2.0.2
>
>
>
> Hi,
>
>
>
> How many partitions does the topic have? How do you check how many
> executors read from the topic?
>
>
>
> Jacek
>
>
>
>
>
> On 2 Dec 2016 2:44 p.m., "gabrielperez2484" <gabr...@adtheorent.com>
> wrote:
>
> Hello,
>
> I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently
> I
> am running into an issue, where only one executor ("kafka consumer") is
> reading from the topic. Which is causing performance to be really poor. I
> have tried adding "--num-executors 8" both in the script to execute the jar
> and in my java code. Here is the code below. Please let me know if I am
> missing something or there is a way to increase the number of consumers to
> connect to kafka.
>
>
> Thanks,
> Gabe
>
> 
> Map<String, Object> kafkaParams = new HashMap<>();
> kafkaParams.put( "bootstrap.servers", "server:9092" );
> kafkaParams.put( "key.deserializer",
> StringDeserializer.class );
> kafkaParams.put( "value.deserializer",
> StringDeserializer.class );
> kafkaParams.put( "group.id", "spark-aggregation" );
> kafkaParams.put( "auto.offset.reset", "earliest" );
> kafkaParams.put( "request.timeout.ms", "305000" );
> kafkaParams.put( "heartbeat.interval.ms", "85000" );
> kafkaParams.put( "session.timeout.ms", "9" );
>
> Collection topics = Arrays.asList( "Topic" );
>
> SparkConf sparkConf = new SparkConf().setMaster(
> "spark://server:7077" )
> .setAppName( "aggregation" ).set(
> "spark.submit.deployMode", "cluster" )
> .set( "spark.executor.instances", "16" );
>
> JavaStreamingContext javaStreamingContext = new
> JavaStreamingContext(
> sparkConf, new Duration( 5000 ) );
>
> //Creates connect to the Stream.
> final JavaInputDStream<ConsumerRecordString, String>>
> stream =
> KafkaUtils.createDirectStream(
> javaStreamingContext, LocationStrategies.
> PreferConsistent(),
> ConsumerStrategies.<String, String>
> Subscribe( topics, kafkaParams ) );
>
> //JavaPairDStream<String, String> unifiedStream =
> javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
> kafkaStreams.size()));
>
> JavaDStream records = stream.map( new
> Function<ConsumerRecordString, String>, String>() {
>
> private static final long serialVersionUID = 1L;
>
> @Override
> /**
>  * Pulling key from the stream and creating the
> aggregation key.
>  */
> public String call( ConsumerRecord<String, String>
> record ) {
>
>
> return record.key();
>
> }
> } );
>
> JavaPairDStream<String, Integer> pairs =
> recor

Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Gabriel Perez
We actually ended up reverting back to 0.9.0 in my testing environment because 
we found other products weren’t ready to go for 0.10 as well. So I am not able 
to create those snapshots. Hopefully I don’t see the same issue with 0.9.0. 
Thank you for your help thought.

Thanks,
Gabe

From: Jacek Laskowski <ja...@japila.pl>
Date: Friday, December 2, 2016 at 12:21 PM
To: Gabriel Perez <gabr...@adtheorent.com>
Cc: user <user@spark.apache.org>
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

Hi,

Can you post the screenshot of the Executors and Streaming tabs?

Jacek

On 2 Dec 2016 5:54 p.m., "Gabriel Perez" 
<gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote:
Hi,

The total partitions are 128 and I can tell its one executor because in the 
consumer list for kafka I see only one thread pulling and in the master spark 
UI I see the executor thread id is showing as 0 and that’s it.

Thanks,
Gabe


From: Jacek Laskowski <ja...@japila.pl<mailto:ja...@japila.pl>>
Date: Friday, December 2, 2016 at 11:47 AM
To: Gabriel Perez <gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>>
Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

Hi,

How many partitions does the topic have? How do you check how many executors 
read from the topic?

Jacek


On 2 Dec 2016 2:44 p.m., "gabrielperez2484" 
<gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote:
Hello,

I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
am running into an issue, where only one executor ("kafka consumer") is
reading from the topic. Which is causing performance to be really poor. I
have tried adding "--num-executors 8" both in the script to execute the jar
and in my java code. Here is the code below. Please let me know if I am
missing something or there is a way to increase the number of consumers to
connect to kafka.


Thanks,
Gabe


Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put( "bootstrap.servers", "server:9092" );
kafkaParams.put( "key.deserializer", StringDeserializer.class );
kafkaParams.put( "value.deserializer", StringDeserializer.class 
);
kafkaParams.put( "group.id<http://group.id>", 
"spark-aggregation" );
kafkaParams.put( "auto.offset.reset", "earliest" );
kafkaParams.put( 
"request.timeout.ms<http://request.timeout.ms>", "305000" );
kafkaParams.put( 
"heartbeat.interval.ms<http://heartbeat.interval.ms>", "85000" );
kafkaParams.put( 
"session.timeout.ms<http://session.timeout.ms>", "9" );

Collection topics = Arrays.asList( "Topic" );

SparkConf sparkConf = new SparkConf().setMaster( 
"spark://server:7077" )
.setAppName( "aggregation" ).set( 
"spark.submit.deployMode", "cluster" )
.set( "spark.executor.instances", "16" );

JavaStreamingContext javaStreamingContext = new 
JavaStreamingContext(
sparkConf, new Duration( 5000 ) );

//Creates connect to the Stream.
final JavaInputDStream<ConsumerRecordString, String>> 
stream =
KafkaUtils.createDirectStream(
javaStreamingContext, 
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe( 
topics, kafkaParams ) );

//JavaPairDStream<String, String> unifiedStream =
javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

JavaDStream records = stream.map( new
Function<ConsumerRecordString, String>, String>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Pulling key from the stream and creating the 
aggregation key.
 */
public String call( ConsumerRecord<String, String> 
record ) {


return record.key();

}
} );

JavaPairDStream<String, Integer> pairs = records.mapToPair( new
PairFunction<String, String, Integer>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Creating new tuple to perform calculations on.
  

Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Jacek Laskowski
Hi,

Can you post the screenshot of the Executors and Streaming tabs?

Jacek

On 2 Dec 2016 5:54 p.m., "Gabriel Perez" <gabr...@adtheorent.com> wrote:

> Hi,
>
>
>
> The total partitions are 128 and I can tell its one executor because in
> the consumer list for kafka I see only one thread pulling and in the master
> spark UI I see the executor thread id is showing as 0 and that’s it.
>
>
>
> Thanks,
>
> Gabe
>
>
>
>
>
> *From: *Jacek Laskowski <ja...@japila.pl>
> *Date: *Friday, December 2, 2016 at 11:47 AM
> *To: *Gabriel Perez <gabr...@adtheorent.com>
> *Cc: *user <user@spark.apache.org>
> *Subject: *Re: Kafka 0.10 & Spark Streaming 2.0.2
>
>
>
> Hi,
>
>
>
> How many partitions does the topic have? How do you check how many
> executors read from the topic?
>
>
>
> Jacek
>
>
>
>
>
> On 2 Dec 2016 2:44 p.m., "gabrielperez2484" <gabr...@adtheorent.com>
> wrote:
>
> Hello,
>
> I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently
> I
> am running into an issue, where only one executor ("kafka consumer") is
> reading from the topic. Which is causing performance to be really poor. I
> have tried adding "--num-executors 8" both in the script to execute the jar
> and in my java code. Here is the code below. Please let me know if I am
> missing something or there is a way to increase the number of consumers to
> connect to kafka.
>
>
> Thanks,
> Gabe
>
> 
> Map<String, Object> kafkaParams = new HashMap<>();
> kafkaParams.put( "bootstrap.servers", "server:9092" );
> kafkaParams.put( "key.deserializer",
> StringDeserializer.class );
> kafkaParams.put( "value.deserializer",
> StringDeserializer.class );
> kafkaParams.put( "group.id", "spark-aggregation" );
> kafkaParams.put( "auto.offset.reset", "earliest" );
> kafkaParams.put( "request.timeout.ms", "305000" );
> kafkaParams.put( "heartbeat.interval.ms", "85000" );
> kafkaParams.put( "session.timeout.ms", "9" );
>
> Collection topics = Arrays.asList( "Topic" );
>
> SparkConf sparkConf = new SparkConf().setMaster(
> "spark://server:7077" )
> .setAppName( "aggregation" ).set(
> "spark.submit.deployMode", "cluster" )
> .set( "spark.executor.instances", "16" );
>
> JavaStreamingContext javaStreamingContext = new
> JavaStreamingContext(
> sparkConf, new Duration( 5000 ) );
>
> //Creates connect to the Stream.
> final JavaInputDStream<ConsumerRecordString, String>>
> stream =
> KafkaUtils.createDirectStream(
> javaStreamingContext, LocationStrategies.
> PreferConsistent(),
> ConsumerStrategies.<String, String>
> Subscribe( topics, kafkaParams ) );
>
> //JavaPairDStream<String, String> unifiedStream =
> javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
> kafkaStreams.size()));
>
> JavaDStream records = stream.map( new
> Function<ConsumerRecordString, String>, String>() {
>
> private static final long serialVersionUID = 1L;
>
> @Override
> /**
>  * Pulling key from the stream and creating the
> aggregation key.
>  */
> public String call( ConsumerRecord<String, String>
> record ) {
>
>
> return record.key();
>
> }
> } );
>
> JavaPairDStream<String, Integer> pairs =
> records.mapToPair( new
> PairFunction<String, String, Integer>() {
>
> private static final long serialVersionUID = 1L;
>
> @Override
> /**
>  * Creating new tuple to perform calculations on.
>  */
> public Tuple2<String, Integer> call( String s ) {
>
> return new Tuple2<>( s, 1 );
> }
> } );
>
>   

Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Gabriel Perez
Hi,

The total partitions are 128 and I can tell its one executor because in the 
consumer list for kafka I see only one thread pulling and in the master spark 
UI I see the executor thread id is showing as 0 and that’s it.

Thanks,
Gabe


From: Jacek Laskowski <ja...@japila.pl>
Date: Friday, December 2, 2016 at 11:47 AM
To: Gabriel Perez <gabr...@adtheorent.com>
Cc: user <user@spark.apache.org>
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

Hi,

How many partitions does the topic have? How do you check how many executors 
read from the topic?

Jacek


On 2 Dec 2016 2:44 p.m., "gabrielperez2484" 
<gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote:
Hello,

I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
am running into an issue, where only one executor ("kafka consumer") is
reading from the topic. Which is causing performance to be really poor. I
have tried adding "--num-executors 8" both in the script to execute the jar
and in my java code. Here is the code below. Please let me know if I am
missing something or there is a way to increase the number of consumers to
connect to kafka.


Thanks,
Gabe


Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put( "bootstrap.servers", "server:9092" );
kafkaParams.put( "key.deserializer", StringDeserializer.class );
kafkaParams.put( "value.deserializer", StringDeserializer.class 
);
kafkaParams.put( "group.id<http://group.id>", 
"spark-aggregation" );
kafkaParams.put( "auto.offset.reset", "earliest" );
kafkaParams.put( 
"request.timeout.ms<http://request.timeout.ms>", "305000" );
kafkaParams.put( 
"heartbeat.interval.ms<http://heartbeat.interval.ms>", "85000" );
kafkaParams.put( 
"session.timeout.ms<http://session.timeout.ms>", "9" );

Collection topics = Arrays.asList( "Topic" );

SparkConf sparkConf = new SparkConf().setMaster( 
"spark://server:7077" )
.setAppName( "aggregation" ).set( 
"spark.submit.deployMode", "cluster" )
.set( "spark.executor.instances", "16" );

JavaStreamingContext javaStreamingContext = new 
JavaStreamingContext(
sparkConf, new Duration( 5000 ) );

//Creates connect to the Stream.
final JavaInputDStream<ConsumerRecordString, String>> 
stream =
KafkaUtils.createDirectStream(
javaStreamingContext, 
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe( 
topics, kafkaParams ) );

//JavaPairDStream<String, String> unifiedStream =
javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

JavaDStream records = stream.map( new
Function<ConsumerRecordString, String>, String>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Pulling key from the stream and creating the 
aggregation key.
 */
public String call( ConsumerRecord<String, String> 
record ) {


return record.key();

}
} );

JavaPairDStream<String, Integer> pairs = records.mapToPair( new
PairFunction<String, String, Integer>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Creating new tuple to perform calculations on.
 */
public Tuple2<String, Integer> call( String s ) {

return new Tuple2<>( s, 1 );
}
} );

JavaPairDStream<String, Integer> counts = pairs.reduceByKey( new
Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * perform counts...
 */
public Integer call( Integer i1, Integer i2 ) {

return i1 + i2;
}
} );

stream.foreachRDD( new 
VoidFunction<JavaRDDConsumerRecordString,
String>>>() {

/**

Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Jacek Laskowski
Hi,

How many partitions does the topic have? How do you check how many
executors read from the topic?

Jacek


On 2 Dec 2016 2:44 p.m., "gabrielperez2484"  wrote:

Hello,

I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
am running into an issue, where only one executor ("kafka consumer") is
reading from the topic. Which is causing performance to be really poor. I
have tried adding "--num-executors 8" both in the script to execute the jar
and in my java code. Here is the code below. Please let me know if I am
missing something or there is a way to increase the number of consumers to
connect to kafka.


Thanks,
Gabe


Map kafkaParams = new HashMap<>();
kafkaParams.put( "bootstrap.servers", "server:9092" );
kafkaParams.put( "key.deserializer",
StringDeserializer.class );
kafkaParams.put( "value.deserializer",
StringDeserializer.class );
kafkaParams.put( "group.id", "spark-aggregation" );
kafkaParams.put( "auto.offset.reset", "earliest" );
kafkaParams.put( "request.timeout.ms", "305000" );
kafkaParams.put( "heartbeat.interval.ms", "85000" );
kafkaParams.put( "session.timeout.ms", "9" );

Collection topics = Arrays.asList( "Topic" );

SparkConf sparkConf = new SparkConf().setMaster(
"spark://server:7077" )
.setAppName( "aggregation" ).set(
"spark.submit.deployMode", "cluster" )
.set( "spark.executor.instances", "16" );

JavaStreamingContext javaStreamingContext = new
JavaStreamingContext(
sparkConf, new Duration( 5000 ) );

//Creates connect to the Stream.
final JavaInputDStream>
stream =
KafkaUtils.createDirectStream(
javaStreamingContext, LocationStrategies.
PreferConsistent(),
ConsumerStrategies.
Subscribe( topics, kafkaParams ) );

//JavaPairDStream unifiedStream =
javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

JavaDStream records = stream.map( new
Function, String>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Pulling key from the stream and creating the
aggregation key.
 */
public String call( ConsumerRecord
record ) {


return record.key();

}
} );

JavaPairDStream pairs = records.mapToPair(
new
PairFunction() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Creating new tuple to perform calculations on.
 */
public Tuple2 call( String s ) {

return new Tuple2<>( s, 1 );
}
} );

JavaPairDStream counts =
pairs.reduceByKey( new
Function2() {

private static final long serialVersionUID = 1L;

@Override
/**
 * perform counts...
 */
public Integer call( Integer i1, Integer i2 ) {

return i1 + i2;
}
} );

stream.foreachRDD( new VoidFunction>>() {

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public void call( JavaRDD> rdd ) {

OffsetRange[] offsetRanges = (
(HasOffsetRanges) rdd.rdd()
).offsetRanges();

// some time later, after outputs have
completed
( (CanCommitOffsets) stream.inputDStream()
).commitAsync( offsetRanges
);
}
} );




--
View this message in context: http://apache-spark-user-list.
1001560.n3.nabble.com/Kafka-0-10-Spark-Streaming-2-0-2-tp28153.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org