Hi,

The total partitions are 128 and I can tell its one executor because in the 
consumer list for kafka I see only one thread pulling and in the master spark 
UI I see the executor thread id is showing as 0 and that’s it.

Thanks,
Gabe


From: Jacek Laskowski <ja...@japila.pl>
Date: Friday, December 2, 2016 at 11:47 AM
To: Gabriel Perez <gabr...@adtheorent.com>
Cc: user <user@spark.apache.org>
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

Hi,

How many partitions does the topic have? How do you check how many executors 
read from the topic?

Jacek


On 2 Dec 2016 2:44 p.m., "gabrielperez2484" 
<gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote:
Hello,

I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
am running into an issue, where only one executor ("kafka consumer") is
reading from the topic. Which is causing performance to be really poor. I
have tried adding "--num-executors 8" both in the script to execute the jar
and in my java code. Here is the code below. Please let me know if I am
missing something or there is a way to increase the number of consumers to
connect to kafka.


Thanks,
Gabe

<code>
Map<String, Object> kafkaParams = new HashMap<>();
                kafkaParams.put( "bootstrap.servers", "server:9092" );
                kafkaParams.put( "key.deserializer", StringDeserializer.class );
                kafkaParams.put( "value.deserializer", StringDeserializer.class 
);
                kafkaParams.put( "group.id<http://group.id>", 
"spark-aggregation" );
                kafkaParams.put( "auto.offset.reset", "earliest" );
                kafkaParams.put( 
"request.timeout.ms<http://request.timeout.ms>", "305000" );
                kafkaParams.put( 
"heartbeat.interval.ms<http://heartbeat.interval.ms>", "85000" );
                kafkaParams.put( 
"session.timeout.ms<http://session.timeout.ms>", "90000" );

                Collection<String> topics = Arrays.asList( "Topic" );

                SparkConf sparkConf = new SparkConf().setMaster( 
"spark://server:7077" )
                                .setAppName( "aggregation" ).set( 
"spark.submit.deployMode", "cluster" )
                                .set( "spark.executor.instances", "16" );

                JavaStreamingContext javaStreamingContext = new 
JavaStreamingContext(
sparkConf, new Duration( 5000 ) );

                //Creates connect to the Stream.....
                final JavaInputDStream<ConsumerRecord&lt;String, String>> 
stream =
KafkaUtils.createDirectStream(
                                javaStreamingContext, 
LocationStrategies.PreferConsistent(),
                                ConsumerStrategies.<String, String> Subscribe( 
topics, kafkaParams ) );

                //JavaPairDStream<String, String> unifiedStream =
javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

                JavaDStream<String> records = stream.map( new
Function<ConsumerRecord&lt;String, String>, String>() {

                        private static final long serialVersionUID = 1L;

                        @Override
                        /**
                         * Pulling key from the stream and creating the 
aggregation key.
                         */
                        public String call( ConsumerRecord<String, String> 
record ) {


                                return record.key();

                        }
                } );

                JavaPairDStream<String, Integer> pairs = records.mapToPair( new
PairFunction<String, String, Integer>() {

                        private static final long serialVersionUID = 1L;

                        @Override
                        /**
                         * Creating new tuple to perform calculations on.
                         */
                        public Tuple2<String, Integer> call( String s ) {

                                return new Tuple2<>( s, 1 );
                        }
                } );

                JavaPairDStream<String, Integer> counts = pairs.reduceByKey( new
Function2<Integer, Integer, Integer>() {

                        private static final long serialVersionUID = 1L;

                        @Override
                        /**
                         * perform counts...
                         */
                        public Integer call( Integer i1, Integer i2 ) {

                                return i1 + i2;
                        }
                } );

                stream.foreachRDD( new 
VoidFunction<JavaRDD&lt;ConsumerRecord&lt;String,
String>>>() {

                        /**
                        *
                        */
                        private static final long serialVersionUID = 1L;

                        @Override
                        public void call( JavaRDD<ConsumerRecord&lt;String, 
String>> rdd ) {

                                OffsetRange[] offsetRanges = ( 
(HasOffsetRanges) rdd.rdd()
).offsetRanges();

                                // some time later, after outputs have completed
                                ( (CanCommitOffsets) stream.inputDStream() 
).commitAsync( offsetRanges
);
                        }
                } );
</code>



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-0-10-Spark-Streaming-2-0-2-tp28153.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>

Reply via email to