Hi Cody,

Following is the way that I am consuming data for a 60 second batch. Do you
see anything that is wrong with the way the data is getting consumed that
can cause slowness in performance?


val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> kafkaBrokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "latest",
      "heartbeat.interval.ms" -> Integer.valueOf(20000),
      "session.timeout.ms" -> Integer.valueOf(60000),
      "request.timeout.ms" -> Integer.valueOf(90000),
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "spark.streaming.kafka.consumer.cache.enabled" -> "false",
      "group.id" -> "test1"
    )

      val hubbleStream = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
      )

val kafkaStreamRdd = kafkaStream.transform { rdd =>
rdd.map(consumerRecord => (consumerRecord.key(), consumerRecord.value()))
}

On Mon, Aug 28, 2017 at 11:56 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> There is no difference in performance even with Cache being enabled.
>
> On Mon, Aug 28, 2017 at 11:27 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> There is no difference in performance even with Cache being disabled.
>>
>> On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> So if you can run with cache enabled for some time, does that
>>> significantly affect the performance issue you were seeing?
>>>
>>> Those settings seem reasonable enough.   If preferred locations is
>>> behaving correctly you shouldn't need cached consumers for all 96
>>> partitions on any one executor, so that maxCapacity setting is
>>> probably unnecessary.
>>>
>>> On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
>>> <swethakasire...@gmail.com> wrote:
>>> > Because I saw some posts that say that consumer cache  enabled will
>>> have
>>> > concurrentModification exception with reduceByKeyAndWIndow. I see those
>>> > errors as well after running for sometime with cache being enabled.
>>> So, I
>>> > had to disable it. Please see the tickets below.  We have 96
>>> partitions. So
>>> > if I enable cache, would teh following settings help to improve
>>> performance?
>>> >
>>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>>> Integer.valueOf(96),
>>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>>> Integer.valueOf(96),
>>> >
>>> > "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
>>> >
>>> >
>>> > http://markmail.org/message/n4cdxwurlhf44q5x
>>> >
>>> > https://issues.apache.org/jira/browse/SPARK-19185
>>> >
>>> > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>> >>
>>> >> Why are you setting consumer.cache.enabled to false?
>>> >>
>>> >> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com>
>>> wrote:
>>> >> > Hi,
>>> >> >
>>> >> > What would be the appropriate settings to run Spark with Kafka 10?
>>> My
>>> >> > job
>>> >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>>> >> > very
>>> >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for
>>> Kafka 10
>>> >> > . I
>>> >> > see the following error sometimes . Please see the kafka parameters
>>> and
>>> >> > the
>>> >> > consumer strategy for creating the stream below. Any suggestions on
>>> how
>>> >> > to
>>> >> > run this with better performance would be of great help.
>>> >> >
>>> >> > java.lang.AssertionError: assertion failed: Failed to get records
>>> for
>>> >> > test
>>> >> > stream1 72 324027964 after polling for 120000
>>> >> >
>>> >> > val kafkaParams = Map[String, Object](
>>> >> >       "bootstrap.servers" -> kafkaBrokers,
>>> >> >       "key.deserializer" -> classOf[StringDeserializer],
>>> >> >       "value.deserializer" -> classOf[StringDeserializer],
>>> >> >       "auto.offset.reset" -> "latest",
>>> >> >       "heartbeat.interval.ms" -> Integer.valueOf(20000),
>>> >> >       "session.timeout.ms" -> Integer.valueOf(60000),
>>> >> >       "request.timeout.ms" -> Integer.valueOf(90000),
>>> >> >       "enable.auto.commit" -> (false: java.lang.Boolean),
>>> >> >       "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>>> >> >       "group.id" -> "test1"
>>> >> >     )
>>> >> >
>>> >> >       val hubbleStream = KafkaUtils.createDirectStream[String,
>>> String](
>>> >> >         ssc,
>>> >> >         LocationStrategies.PreferConsistent,
>>> >> >         ConsumerStrategies.Subscribe[String, String](topicsSet,
>>> >> > kafkaParams)
>>> >> >       )
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> > --
>>> >> > View this message in context:
>>> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-p
>>> erformance-while-running-Spark-Kafka-Direct-Streaming-with-K
>>> afka-10-cluster-tp29108.html
>>> >> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >> >
>>> >> > ------------------------------------------------------------
>>> ---------
>>> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >> >
>>> >
>>> >
>>>
>>
>>
>

Reply via email to