So if you can run with cache enabled for some time, does that
significantly affect the performance issue you were seeing?

Those settings seem reasonable enough.   If preferred locations is
behaving correctly you shouldn't need cached consumers for all 96
partitions on any one executor, so that maxCapacity setting is
probably unnecessary.

On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
<swethakasire...@gmail.com> wrote:
> Because I saw some posts that say that consumer cache  enabled will have
> concurrentModification exception with reduceByKeyAndWIndow. I see those
> errors as well after running for sometime with cache being enabled. So, I
> had to disable it. Please see the tickets below.  We have 96 partitions. So
> if I enable cache, would teh following settings help to improve performance?
>
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.valueOf(96),
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.valueOf(96),
>
> "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
>
>
> http://markmail.org/message/n4cdxwurlhf44q5x
>
> https://issues.apache.org/jira/browse/SPARK-19185
>
> On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Why are you setting consumer.cache.enabled to false?
>>
>> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com> wrote:
>> > Hi,
>> >
>> > What would be the appropriate settings to run Spark with Kafka 10? My
>> > job
>> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>> > very
>> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10
>> > . I
>> > see the following error sometimes . Please see the kafka parameters and
>> > the
>> > consumer strategy for creating the stream below. Any suggestions on how
>> > to
>> > run this with better performance would be of great help.
>> >
>> > java.lang.AssertionError: assertion failed: Failed to get records for
>> > test
>> > stream1 72 324027964 after polling for 120000
>> >
>> > val kafkaParams = Map[String, Object](
>> >       "bootstrap.servers" -> kafkaBrokers,
>> >       "key.deserializer" -> classOf[StringDeserializer],
>> >       "value.deserializer" -> classOf[StringDeserializer],
>> >       "auto.offset.reset" -> "latest",
>> >       "heartbeat.interval.ms" -> Integer.valueOf(20000),
>> >       "session.timeout.ms" -> Integer.valueOf(60000),
>> >       "request.timeout.ms" -> Integer.valueOf(90000),
>> >       "enable.auto.commit" -> (false: java.lang.Boolean),
>> >       "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>> >       "group.id" -> "test1"
>> >     )
>> >
>> >       val hubbleStream = KafkaUtils.createDirectStream[String, String](
>> >         ssc,
>> >         LocationStrategies.PreferConsistent,
>> >         ConsumerStrategies.Subscribe[String, String](topicsSet,
>> > kafkaParams)
>> >       )
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-performance-while-running-Spark-Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to