Because I saw some posts that say that consumer cache enabled will have concurrentModification exception with reduceByKeyAndWIndow. I see those errors as well after running for sometime with cache being enabled. So, I had to disable it. Please see the tickets below. We have 96 partitions. So if I enable cache, would teh following settings help to improve performance?
"spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*(96), "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*(96), "spark.streaming.kafka.consumer.poll.ms" -> Integer.*valueOf*(1024), http://markmail.org/message/n4cdxwurlhf44q5x https://issues.apache.org/jira/browse/SPARK-19185 On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org> wrote: > Why are you setting consumer.cache.enabled to false? > > On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com> wrote: > > Hi, > > > > What would be the appropriate settings to run Spark with Kafka 10? My job > > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its very > > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10 > . I > > see the following error sometimes . Please see the kafka parameters and > the > > consumer strategy for creating the stream below. Any suggestions on how > to > > run this with better performance would be of great help. > > > > java.lang.AssertionError: assertion failed: Failed to get records for > test > > stream1 72 324027964 after polling for 120000 > > > > val kafkaParams = Map[String, Object]( > > "bootstrap.servers" -> kafkaBrokers, > > "key.deserializer" -> classOf[StringDeserializer], > > "value.deserializer" -> classOf[StringDeserializer], > > "auto.offset.reset" -> "latest", > > "heartbeat.interval.ms" -> Integer.valueOf(20000), > > "session.timeout.ms" -> Integer.valueOf(60000), > > "request.timeout.ms" -> Integer.valueOf(90000), > > "enable.auto.commit" -> (false: java.lang.Boolean), > > "spark.streaming.kafka.consumer.cache.enabled" -> "false", > > "group.id" -> "test1" > > ) > > > > val hubbleStream = KafkaUtils.createDirectStream[String, String]( > > ssc, > > LocationStrategies.PreferConsistent, > > ConsumerStrategies.Subscribe[String, String](topicsSet, > kafkaParams) > > ) > > > > > > > > > > > > -- > > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Slower-performance-while-running- > Spark-Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > --------------------------------------------------------------------- > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >