Hi Cody, Following is the way that I am consuming data for a 60 second batch. Do you see anything that is wrong with the way the data is getting consumed that can cause slowness in performance?
val kafkaParams = Map[String, Object]( "bootstrap.servers" -> kafkaBrokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "auto.offset.reset" -> "latest", "heartbeat.interval.ms" -> Integer.valueOf(20000), "session.timeout.ms" -> Integer.valueOf(60000), "request.timeout.ms" -> Integer.valueOf(90000), "enable.auto.commit" -> (false: java.lang.Boolean), "spark.streaming.kafka.consumer.cache.enabled" -> "false", "group.id" -> "test1" ) val hubbleStream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams) ) val kafkaStreamRdd = kafkaStream.transform { rdd => rdd.map(consumerRecord => (consumerRecord.key(), consumerRecord.value())) } On Mon, Aug 28, 2017 at 11:56 AM, swetha kasireddy < swethakasire...@gmail.com> wrote: > There is no difference in performance even with Cache being enabled. > > On Mon, Aug 28, 2017 at 11:27 AM, swetha kasireddy < > swethakasire...@gmail.com> wrote: > >> There is no difference in performance even with Cache being disabled. >> >> On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> So if you can run with cache enabled for some time, does that >>> significantly affect the performance issue you were seeing? >>> >>> Those settings seem reasonable enough. If preferred locations is >>> behaving correctly you shouldn't need cached consumers for all 96 >>> partitions on any one executor, so that maxCapacity setting is >>> probably unnecessary. >>> >>> On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy >>> <swethakasire...@gmail.com> wrote: >>> > Because I saw some posts that say that consumer cache enabled will >>> have >>> > concurrentModification exception with reduceByKeyAndWIndow. I see those >>> > errors as well after running for sometime with cache being enabled. >>> So, I >>> > had to disable it. Please see the tickets below. We have 96 >>> partitions. So >>> > if I enable cache, would teh following settings help to improve >>> performance? >>> > >>> > "spark.streaming.kafka.consumer.cache.maxCapacity" -> >>> Integer.valueOf(96), >>> > "spark.streaming.kafka.consumer.cache.maxCapacity" -> >>> Integer.valueOf(96), >>> > >>> > "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024), >>> > >>> > >>> > http://markmail.org/message/n4cdxwurlhf44q5x >>> > >>> > https://issues.apache.org/jira/browse/SPARK-19185 >>> > >>> > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >> >>> >> Why are you setting consumer.cache.enabled to false? >>> >> >>> >> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com> >>> wrote: >>> >> > Hi, >>> >> > >>> >> > What would be the appropriate settings to run Spark with Kafka 10? >>> My >>> >> > job >>> >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its >>> >> > very >>> >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for >>> Kafka 10 >>> >> > . I >>> >> > see the following error sometimes . Please see the kafka parameters >>> and >>> >> > the >>> >> > consumer strategy for creating the stream below. Any suggestions on >>> how >>> >> > to >>> >> > run this with better performance would be of great help. >>> >> > >>> >> > java.lang.AssertionError: assertion failed: Failed to get records >>> for >>> >> > test >>> >> > stream1 72 324027964 after polling for 120000 >>> >> > >>> >> > val kafkaParams = Map[String, Object]( >>> >> > "bootstrap.servers" -> kafkaBrokers, >>> >> > "key.deserializer" -> classOf[StringDeserializer], >>> >> > "value.deserializer" -> classOf[StringDeserializer], >>> >> > "auto.offset.reset" -> "latest", >>> >> > "heartbeat.interval.ms" -> Integer.valueOf(20000), >>> >> > "session.timeout.ms" -> Integer.valueOf(60000), >>> >> > "request.timeout.ms" -> Integer.valueOf(90000), >>> >> > "enable.auto.commit" -> (false: java.lang.Boolean), >>> >> > "spark.streaming.kafka.consumer.cache.enabled" -> "false", >>> >> > "group.id" -> "test1" >>> >> > ) >>> >> > >>> >> > val hubbleStream = KafkaUtils.createDirectStream[String, >>> String]( >>> >> > ssc, >>> >> > LocationStrategies.PreferConsistent, >>> >> > ConsumerStrategies.Subscribe[String, String](topicsSet, >>> >> > kafkaParams) >>> >> > ) >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > -- >>> >> > View this message in context: >>> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-p >>> erformance-while-running-Spark-Kafka-Direct-Streaming-with-K >>> afka-10-cluster-tp29108.html >>> >> > Sent from the Apache Spark User List mailing list archive at >>> Nabble.com. >>> >> > >>> >> > ------------------------------------------------------------ >>> --------- >>> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >> > >>> > >>> > >>> >> >> >