We actually ended up reverting back to 0.9.0 in my testing environment because we found other products weren’t ready to go for 0.10 as well. So I am not able to create those snapshots. Hopefully I don’t see the same issue with 0.9.0. Thank you for your help thought.
Thanks, Gabe From: Jacek Laskowski <ja...@japila.pl> Date: Friday, December 2, 2016 at 12:21 PM To: Gabriel Perez <gabr...@adtheorent.com> Cc: user <user@spark.apache.org> Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2 Hi, Can you post the screenshot of the Executors and Streaming tabs? Jacek On 2 Dec 2016 5:54 p.m., "Gabriel Perez" <gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote: Hi, The total partitions are 128 and I can tell its one executor because in the consumer list for kafka I see only one thread pulling and in the master spark UI I see the executor thread id is showing as 0 and that’s it. Thanks, Gabe From: Jacek Laskowski <ja...@japila.pl<mailto:ja...@japila.pl>> Date: Friday, December 2, 2016 at 11:47 AM To: Gabriel Perez <gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2 Hi, How many partitions does the topic have? How do you check how many executors read from the topic? Jacek On 2 Dec 2016 2:44 p.m., "gabrielperez2484" <gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote: Hello, I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I am running into an issue, where only one executor ("kafka consumer") is reading from the topic. Which is causing performance to be really poor. I have tried adding "--num-executors 8" both in the script to execute the jar and in my java code. Here is the code below. Please let me know if I am missing something or there is a way to increase the number of consumers to connect to kafka. Thanks, Gabe <code> Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put( "bootstrap.servers", "server:9092" ); kafkaParams.put( "key.deserializer", StringDeserializer.class ); kafkaParams.put( "value.deserializer", StringDeserializer.class ); kafkaParams.put( "group.id<http://group.id>", "spark-aggregation" ); kafkaParams.put( "auto.offset.reset", "earliest" ); kafkaParams.put( "request.timeout.ms<http://request.timeout.ms>", "305000" ); kafkaParams.put( "heartbeat.interval.ms<http://heartbeat.interval.ms>", "85000" ); kafkaParams.put( "session.timeout.ms<http://session.timeout.ms>", "90000" ); Collection<String> topics = Arrays.asList( "Topic" ); SparkConf sparkConf = new SparkConf().setMaster( "spark://server:7077" ) .setAppName( "aggregation" ).set( "spark.submit.deployMode", "cluster" ) .set( "spark.executor.instances", "16" ); JavaStreamingContext javaStreamingContext = new JavaStreamingContext( sparkConf, new Duration( 5000 ) ); //Creates connect to the Stream..... final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream( javaStreamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe( topics, kafkaParams ) ); //JavaPairDStream<String, String> unifiedStream = javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); JavaDStream<String> records = stream.map( new Function<ConsumerRecord<String, String>, String>() { private static final long serialVersionUID = 1L; @Override /** * Pulling key from the stream and creating the aggregation key. */ public String call( ConsumerRecord<String, String> record ) { return record.key(); } } ); JavaPairDStream<String, Integer> pairs = records.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override /** * Creating new tuple to perform calculations on. */ public Tuple2<String, Integer> call( String s ) { return new Tuple2<>( s, 1 ); } } ); JavaPairDStream<String, Integer> counts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override /** * perform counts... */ public Integer call( Integer i1, Integer i2 ) { return i1 + i2; } } ); stream.foreachRDD( new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call( JavaRDD<ConsumerRecord<String, String>> rdd ) { OffsetRange[] offsetRanges = ( (HasOffsetRanges) rdd.rdd() ).offsetRanges(); // some time later, after outputs have completed ( (CanCommitOffsets) stream.inputDStream() ).commitAsync( offsetRanges ); } } ); </code> -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-0-10-Spark-Streaming-2-0-2-tp28153.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>