Hello Guozhang,

Thanks for the analysis. Figured out the reason for the OOM and it was
actually caused by an in memory queue in the app itself. I have fixed it
and right now it all looks good. Sorry for the inconvenience and thanks for
helping out.

Thanks,
Tony

On Wed, Nov 8, 2017 at 1:20 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Tony,
>
> You mentioned in 0.11.0.0 the
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
> 1) while in 0.11.0.1 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
> 2).
> But from your logs it seems you set this config as 2 in both versions.
> Right?
>
> Anyways, I took a look into your logs and I think you are hitting a known
> issue (https://issues.apache.org/jira/browse/KAFKA-5152) that has been
> fixed in 0.11.0.0; that is why you only see the WARN log entry in 0.11.0.1
> but the app is not dying out. The running out of memory issues seems not
> related to the CommitFailed error. Do you have any stateful operations in
> your app that use an iterator? Did you close the iterator after complete
> using it?
>
>
> Guozhang
>
>
> On Tue, Nov 7, 2017 at 12:42 AM, Tony John <tonyjohnant...@gmail.com>
> wrote:
>
> > Hi Guozang,
> >
> > Thanks for looking into this. I was using 0.11.0.0 version of the library
> > earlier when I was getting the CommitFailed exception and the tasks were
> > terminating. The application config then was Replication Factor = 2, Num
> > Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll
> > Interval = 2147483647. The streams config code (*Streams Config While
> > Using
> > 0.11.0.0*) is given below and the logs of the application while using
> > 0.11.0.0 can be downloaded from
> > https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0
> >
> > I have upgraded the libraries to 0.11.0.1 and ran into some other issues.
> > Though the CommitFailed error logs are still showing up with 0.11.0.1 the
> > tasks are not getting terminated, but the app quickly runs out of memory
> > (GC overhead limit exceeded) and the CPU is choked, which was not the
> case
> > earlier. The logs are available @
> > https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and
> > application config is also given below (*Streams Config While Using
> > 0.11.0.1*).  Since I am not sure what configuration helps reduce the
> > CommitFailed error, which I think could be one of the reasons for the CPU
> > choke and eventually cause an OOM, I have gone ahead and used all
> possible
> > configuration parameters, but still no luck.
> >
> > It would be great if you could shed some light on this as to what could
> be
> > causing this problem.
> >
> > *Streams Config While Using 0.11.0.0 *
> >
> > val props = Properties()
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > EngineConfig.APPLICATION_ID)
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > EngineConfig.KAFKA_SERVERS)
> > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
> > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> > POLL_RECORDS_CONFIG),
> > 1000)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> > POLL_INTERVAL_MS_CONFIG),
> > Int.MAX_VALUE)
> >
> > streams = KafkaStreams(builder, StreamsConfig(props))
> > streams.start()
> >
> >
> > *Streams Config While Using 0.11.0.1*
> >
> > val props = Properties()
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > EngineConfig.APPLICATION_ID)
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > EngineConfig.KAFKA_SERVERS)
> > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2)
> >
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTB
> > EAT_INTERVAL_MS_CONFIG),
> > 10000)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_
> > COMMIT_INTERVAL_MS_CONFIG),
> > 10000)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE
> > _AUTO_COMMIT_CONFIG),
> > true)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> > POLL_RECORDS_CONFIG),
> > 10000)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> > POLL_INTERVAL_MS_CONFIG),
> > Int.MAX_VALUE)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSIO
> > N_TIMEOUT_MS_CONFIG),
> > 30000)
> >
> > streams = KafkaStreams(builder, StreamsConfig(props))
> > streams.start()
> >
> >
> > Thanks,
> > Tony
> >
> > On Thu, Nov 2, 2017 at 4:39 PM, Tony John <tonyjohnant...@gmail.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > I am facing CommitFailedException in my streams application. As per the
> > > log I tried changing the max.poll.interval.ms and max.poll.records.
> But
> > > both didn't help. PFA the full stack trace of the exception and below
> is
> > > the streams configuration used. What else could be wrong?
> > >
> > > val props = Properties()
> > > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > EngineConfig.APPLICATION_ID)
> > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > EngineConfig.KAFKA_SERVERS)
> > > props.put(StreamsConfig.STATE_DIR_CONFIG,
> EngineConfig.STATE_STORE_DIR)
> > > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> > > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
> > > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
> > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.
> MAX_POLL_RECORDS_CONFIG),
> > 1000)
> > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.
> MAX_POLL_INTERVAL_MS_CONFIG),
> > Int.MAX_VALUE)
> > >
> > > streams = KafkaStreams(builder, StreamsConfig(props))
> > > streams.start()
> > >
> > >
> > > Thanks,
> > > Tony
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to