I had it setup with three nodes, a master and 2 slaves. Is there anything that would tell me it was in local mode. I am also added the –deploy-mode cluster flag and saw the same results.
Thanks, Gabe From: Mich Talebzadeh <mich.talebza...@gmail.com> Date: Friday, December 2, 2016 at 12:26 PM To: Gabriel Perez <gabr...@adtheorent.com> Cc: Jacek Laskowski <ja...@japila.pl>, user <user@spark.apache.org> Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2 in this POC of yours are you running this app with spark in Local mode by any chance? Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On 2 December 2016 at 16:54, Gabriel Perez <gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote: Hi, The total partitions are 128 and I can tell its one executor because in the consumer list for kafka I see only one thread pulling and in the master spark UI I see the executor thread id is showing as 0 and that’s it. Thanks, Gabe From: Jacek Laskowski <ja...@japila.pl<mailto:ja...@japila.pl>> Date: Friday, December 2, 2016 at 11:47 AM To: Gabriel Perez <gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2 Hi, How many partitions does the topic have? How do you check how many executors read from the topic? Jacek On 2 Dec 2016 2:44 p.m., "gabrielperez2484" <gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote: Hello, I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I am running into an issue, where only one executor ("kafka consumer") is reading from the topic. Which is causing performance to be really poor. I have tried adding "--num-executors 8" both in the script to execute the jar and in my java code. Here is the code below. Please let me know if I am missing something or there is a way to increase the number of consumers to connect to kafka. Thanks, Gabe <code> Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put( "bootstrap.servers", "server:9092" ); kafkaParams.put( "key.deserializer", StringDeserializer.class ); kafkaParams.put( "value.deserializer", StringDeserializer.class ); kafkaParams.put( "group.id<http://group.id>", "spark-aggregation" ); kafkaParams.put( "auto.offset.reset", "earliest" ); kafkaParams.put( "request.timeout.ms<http://request.timeout.ms>", "305000" ); kafkaParams.put( "heartbeat.interval.ms<http://heartbeat.interval.ms>", "85000" ); kafkaParams.put( "session.timeout.ms<http://session.timeout.ms>", "90000" ); Collection<String> topics = Arrays.asList( "Topic" ); SparkConf sparkConf = new SparkConf().setMaster( "spark://server:7077" ) .setAppName( "aggregation" ).set( "spark.submit.deployMode", "cluster" ) .set( "spark.executor.instances", "16" ); JavaStreamingContext javaStreamingContext = new JavaStreamingContext( sparkConf, new Duration( 5000 ) ); //Creates connect to the Stream..... final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream( javaStreamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe( topics, kafkaParams ) ); //JavaPairDStream<String, String> unifiedStream = javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); JavaDStream<String> records = stream.map( new Function<ConsumerRecord<String, String>, String>() { private static final long serialVersionUID = 1L; @Override /** * Pulling key from the stream and creating the aggregation key. */ public String call( ConsumerRecord<String, String> record ) { return record.key(); } } ); JavaPairDStream<String, Integer> pairs = records.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override /** * Creating new tuple to perform calculations on. */ public Tuple2<String, Integer> call( String s ) { return new Tuple2<>( s, 1 ); } } ); JavaPairDStream<String, Integer> counts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override /** * perform counts... */ public Integer call( Integer i1, Integer i2 ) { return i1 + i2; } } ); stream.foreachRDD( new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call( JavaRDD<ConsumerRecord<String, String>> rdd ) { OffsetRange[] offsetRanges = ( (HasOffsetRanges) rdd.rdd() ).offsetRanges(); // some time later, after outputs have completed ( (CanCommitOffsets) stream.inputDStream() ).commitAsync( offsetRanges ); } } ); </code> -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-0-10-Spark-Streaming-2-0-2-tp28153.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>