I don't see anything obvious. If the slowness is correlated with the
errors you're seeing, I'd start looking at what's going on with kafka or
your network.
On Mon, Aug 28, 2017 at 7:06 PM, swetha kasireddy wrote:
> Hi Cody,
>
> Following is the way that I am consuming data for a 60 second batch
Hi Cody,
Following is the way that I am consuming data for a 60 second batch. Do you
see anything that is wrong with the way the data is getting consumed that
can cause slowness in performance?
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> kafkaBrokers,
"key.deseriali
There is no difference in performance even with Cache being enabled.
On Mon, Aug 28, 2017 at 11:27 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:
> There is no difference in performance even with Cache being disabled.
>
> On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger
> wrote:
>
>> So
auto.offset.reset" -> "latest",
> >> > "heartbeat.interval.ms" -> Integer.valueOf(2),
> >> > "session.timeout.ms" -> Integer.valueOf(6),
> >> > "request.timeout.ms" -> Integer.valueOf(9),
> >&
teger.valueOf(2),
>> > "session.timeout.ms" -> Integer.valueOf(6),
>> > "request.timeout.ms" -> Integer.valueOf(9),
>> > "enable.auto.commit" -> (false: java.lang.Boolean),
>> > "spark.streaming.kafka.consumer.cache.enabled&quo
gt; "request.timeout.ms" -> Integer.valueOf(9),
>> > "enable.auto.commit" -> (false: java.lang.Boolean),
>> > "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>> > "group.id" -> "test1
roup.id" -> "test1"
> > )
> >
> > val hubbleStream = KafkaUtils.createDirectStream[String, String](
> > ssc,
> > LocationStrategies.PreferConsistent,
> > ConsumerStrategies.Subscribe[String, String](topicsSet,
>
est1"
> )
>
> val hubbleStream = KafkaUtils.createDirectStream[String, String](
> ssc,
> LocationStrategies.PreferConsistent,
> ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
> )
>
>
>
&g
nabled" -> "false",
"group.id" -> "test1"
)
val hubbleStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)