Because I saw some posts that say that consumer cache  enabled will have
concurrentModification exception with reduceByKeyAndWIndow. I see those
errors as well after running for sometime with cache being enabled. So, I
had to disable it. Please see the tickets below.  We have 96 partitions. So
if I enable cache, would teh following settings help to improve
performance?

"spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*(96),
"spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*(96),

"spark.streaming.kafka.consumer.poll.ms" -> Integer.*valueOf*(1024),

http://markmail.org/message/n4cdxwurlhf44q5x

https://issues.apache.org/jira/browse/SPARK-19185

On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Why are you setting consumer.cache.enabled to false?
>
> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com> wrote:
> > Hi,
> >
> > What would be the appropriate settings to run Spark with Kafka 10? My job
> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its very
> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10
> . I
> > see the following error sometimes . Please see the kafka parameters and
> the
> > consumer strategy for creating the stream below. Any suggestions on how
> to
> > run this with better performance would be of great help.
> >
> > java.lang.AssertionError: assertion failed: Failed to get records for
> test
> > stream1 72 324027964 after polling for 120000
> >
> > val kafkaParams = Map[String, Object](
> >       "bootstrap.servers" -> kafkaBrokers,
> >       "key.deserializer" -> classOf[StringDeserializer],
> >       "value.deserializer" -> classOf[StringDeserializer],
> >       "auto.offset.reset" -> "latest",
> >       "heartbeat.interval.ms" -> Integer.valueOf(20000),
> >       "session.timeout.ms" -> Integer.valueOf(60000),
> >       "request.timeout.ms" -> Integer.valueOf(90000),
> >       "enable.auto.commit" -> (false: java.lang.Boolean),
> >       "spark.streaming.kafka.consumer.cache.enabled" -> "false",
> >       "group.id" -> "test1"
> >     )
> >
> >       val hubbleStream = KafkaUtils.createDirectStream[String, String](
> >         ssc,
> >         LocationStrategies.PreferConsistent,
> >         ConsumerStrategies.Subscribe[String, String](topicsSet,
> kafkaParams)
> >       )
> >
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Slower-performance-while-running-
> Spark-Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>

Reply via email to