I had it setup with three nodes, a master and 2 slaves. Is there anything that 
would tell me it was in local mode. I am also added the –deploy-mode cluster 
flag and saw the same results.

Thanks,
Gabe

From: Mich Talebzadeh <mich.talebza...@gmail.com>
Date: Friday, December 2, 2016 at 12:26 PM
To: Gabriel Perez <gabr...@adtheorent.com>
Cc: Jacek Laskowski <ja...@japila.pl>, user <user@spark.apache.org>
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

in this POC of yours are you running this app with spark  in Local mode by any 
chance?


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 2 December 2016 at 16:54, Gabriel Perez 
<gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote:
Hi,

The total partitions are 128 and I can tell its one executor because in the 
consumer list for kafka I see only one thread pulling and in the master spark 
UI I see the executor thread id is showing as 0 and that’s it.

Thanks,
Gabe


From: Jacek Laskowski <ja...@japila.pl<mailto:ja...@japila.pl>>
Date: Friday, December 2, 2016 at 11:47 AM
To: Gabriel Perez <gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>>
Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

Hi,

How many partitions does the topic have? How do you check how many executors 
read from the topic?

Jacek


On 2 Dec 2016 2:44 p.m., "gabrielperez2484" 
<gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote:
Hello,

I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
am running into an issue, where only one executor ("kafka consumer") is
reading from the topic. Which is causing performance to be really poor. I
have tried adding "--num-executors 8" both in the script to execute the jar
and in my java code. Here is the code below. Please let me know if I am
missing something or there is a way to increase the number of consumers to
connect to kafka.


Thanks,
Gabe

<code>
Map<String, Object> kafkaParams = new HashMap<>();
                kafkaParams.put( "bootstrap.servers", "server:9092" );
                kafkaParams.put( "key.deserializer", StringDeserializer.class );
                kafkaParams.put( "value.deserializer", StringDeserializer.class 
);
                kafkaParams.put( "group.id<http://group.id>", 
"spark-aggregation" );
                kafkaParams.put( "auto.offset.reset", "earliest" );
                kafkaParams.put( 
"request.timeout.ms<http://request.timeout.ms>", "305000" );
                kafkaParams.put( 
"heartbeat.interval.ms<http://heartbeat.interval.ms>", "85000" );
                kafkaParams.put( 
"session.timeout.ms<http://session.timeout.ms>", "90000" );

                Collection<String> topics = Arrays.asList( "Topic" );

                SparkConf sparkConf = new SparkConf().setMaster( 
"spark://server:7077" )
                                .setAppName( "aggregation" ).set( 
"spark.submit.deployMode", "cluster" )
                                .set( "spark.executor.instances", "16" );

                JavaStreamingContext javaStreamingContext = new 
JavaStreamingContext(
sparkConf, new Duration( 5000 ) );

                //Creates connect to the Stream.....
                final JavaInputDStream<ConsumerRecord&lt;String, String>> 
stream =
KafkaUtils.createDirectStream(
                                javaStreamingContext, 
LocationStrategies.PreferConsistent(),
                                ConsumerStrategies.<String, String> Subscribe( 
topics, kafkaParams ) );

                //JavaPairDStream<String, String> unifiedStream =
javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

                JavaDStream<String> records = stream.map( new
Function<ConsumerRecord&lt;String, String>, String>() {

                        private static final long serialVersionUID = 1L;

                        @Override
                        /**
                         * Pulling key from the stream and creating the 
aggregation key.
                         */
                        public String call( ConsumerRecord<String, String> 
record ) {


                                return record.key();

                        }
                } );

                JavaPairDStream<String, Integer> pairs = records.mapToPair( new
PairFunction<String, String, Integer>() {

                        private static final long serialVersionUID = 1L;

                        @Override
                        /**
                         * Creating new tuple to perform calculations on.
                         */
                        public Tuple2<String, Integer> call( String s ) {

                                return new Tuple2<>( s, 1 );
                        }
                } );

                JavaPairDStream<String, Integer> counts = pairs.reduceByKey( new
Function2<Integer, Integer, Integer>() {

                        private static final long serialVersionUID = 1L;

                        @Override
                        /**
                         * perform counts...
                         */
                        public Integer call( Integer i1, Integer i2 ) {

                                return i1 + i2;
                        }
                } );

                stream.foreachRDD( new 
VoidFunction<JavaRDD&lt;ConsumerRecord&lt;String,
String>>>() {

                        /**
                        *
                        */
                        private static final long serialVersionUID = 1L;

                        @Override
                        public void call( JavaRDD<ConsumerRecord&lt;String, 
String>> rdd ) {

                                OffsetRange[] offsetRanges = ( 
(HasOffsetRanges) rdd.rdd()
).offsetRanges();

                                // some time later, after outputs have completed
                                ( (CanCommitOffsets) stream.inputDStream() 
).commitAsync( offsetRanges
);
                        }
                } );
</code>



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-0-10-Spark-Streaming-2-0-2-tp28153.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>


Reply via email to