Re: Kafka Streams : CommitFailedException

2017-11-08 Thread Tony John
Hello Guozhang,

Thanks for the analysis. Figured out the reason for the OOM and it was
actually caused by an in memory queue in the app itself. I have fixed it
and right now it all looks good. Sorry for the inconvenience and thanks for
helping out.

Thanks,
Tony

On Wed, Nov 8, 2017 at 1:20 AM, Guozhang Wang  wrote:

> Hello Tony,
>
> You mentioned in 0.11.0.0 the
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
> 1) while in 0.11.0.1 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
> 2).
> But from your logs it seems you set this config as 2 in both versions.
> Right?
>
> Anyways, I took a look into your logs and I think you are hitting a known
> issue (https://issues.apache.org/jira/browse/KAFKA-5152) that has been
> fixed in 0.11.0.0; that is why you only see the WARN log entry in 0.11.0.1
> but the app is not dying out. The running out of memory issues seems not
> related to the CommitFailed error. Do you have any stateful operations in
> your app that use an iterator? Did you close the iterator after complete
> using it?
>
>
> Guozhang
>
>
> On Tue, Nov 7, 2017 at 12:42 AM, Tony John 
> wrote:
>
> > Hi Guozang,
> >
> > Thanks for looking into this. I was using 0.11.0.0 version of the library
> > earlier when I was getting the CommitFailed exception and the tasks were
> > terminating. The application config then was Replication Factor = 2, Num
> > Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll
> > Interval = 2147483647. The streams config code (*Streams Config While
> > Using
> > 0.11.0.0*) is given below and the logs of the application while using
> > 0.11.0.0 can be downloaded from
> > https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0
> >
> > I have upgraded the libraries to 0.11.0.1 and ran into some other issues.
> > Though the CommitFailed error logs are still showing up with 0.11.0.1 the
> > tasks are not getting terminated, but the app quickly runs out of memory
> > (GC overhead limit exceeded) and the CPU is choked, which was not the
> case
> > earlier. The logs are available @
> > https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and
> > application config is also given below (*Streams Config While Using
> > 0.11.0.1*).  Since I am not sure what configuration helps reduce the
> > CommitFailed error, which I think could be one of the reasons for the CPU
> > choke and eventually cause an OOM, I have gone ahead and used all
> possible
> > configuration parameters, but still no luck.
> >
> > It would be great if you could shed some light on this as to what could
> be
> > causing this problem.
> >
> > *Streams Config While Using 0.11.0.0 *
> >
> > val props = Properties()
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > EngineConfig.APPLICATION_ID)
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > EngineConfig.KAFKA_SERVERS)
> > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
> > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> > POLL_RECORDS_CONFIG),
> > 1000)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> > POLL_INTERVAL_MS_CONFIG),
> > Int.MAX_VALUE)
> >
> > streams = KafkaStreams(builder, StreamsConfig(props))
> > streams.start()
> >
> >
> > *Streams Config While Using 0.11.0.1*
> >
> > val props = Properties()
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > EngineConfig.APPLICATION_ID)
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > EngineConfig.KAFKA_SERVERS)
> > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2)
> >
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTB
> > EAT_INTERVAL_MS_CONFIG),
> > 1)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_
> > COMMIT_INTERVAL_MS_CONFIG),
> > 1)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE
> > _AUTO_COMMIT_CONFIG),
> > true)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> > POLL_RECORDS_CONFIG),
> > 1)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> > POLL_INTERVAL_MS_CONFIG),
> > Int.MAX_VALUE)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSIO
> > N_TIMEOUT_MS_CONFIG),
> > 3)
> >
> > streams = KafkaStreams(builder, StreamsConfig(props))
> > streams.start()
> >
> >
> > Thanks,
> > Tony
> >
> > On Thu, Nov 2, 2017 at 4:39 PM, Tony John 
> > wrote:
> >
> > > Hi All,
> > >
> > > I am facing CommitFailedException in my streams application. As per the
> > > log I tried changing the max.poll.interval.ms and max.poll.records.
> But
> > > both didn't help. PFA the full stack trace of the exception and 

Re: Kafka Streams : CommitFailedException

2017-11-07 Thread Guozhang Wang
Hello Tony,

You mentioned in 0.11.0.0 the
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
1) while in 0.11.0.1 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2).
But from your logs it seems you set this config as 2 in both versions.
Right?

Anyways, I took a look into your logs and I think you are hitting a known
issue (https://issues.apache.org/jira/browse/KAFKA-5152) that has been
fixed in 0.11.0.0; that is why you only see the WARN log entry in 0.11.0.1
but the app is not dying out. The running out of memory issues seems not
related to the CommitFailed error. Do you have any stateful operations in
your app that use an iterator? Did you close the iterator after complete
using it?


Guozhang


On Tue, Nov 7, 2017 at 12:42 AM, Tony John  wrote:

> Hi Guozang,
>
> Thanks for looking into this. I was using 0.11.0.0 version of the library
> earlier when I was getting the CommitFailed exception and the tasks were
> terminating. The application config then was Replication Factor = 2, Num
> Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll
> Interval = 2147483647. The streams config code (*Streams Config While
> Using
> 0.11.0.0*) is given below and the logs of the application while using
> 0.11.0.0 can be downloaded from
> https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0
>
> I have upgraded the libraries to 0.11.0.1 and ran into some other issues.
> Though the CommitFailed error logs are still showing up with 0.11.0.1 the
> tasks are not getting terminated, but the app quickly runs out of memory
> (GC overhead limit exceeded) and the CPU is choked, which was not the case
> earlier. The logs are available @
> https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and
> application config is also given below (*Streams Config While Using
> 0.11.0.1*).  Since I am not sure what configuration helps reduce the
> CommitFailed error, which I think could be one of the reasons for the CPU
> choke and eventually cause an OOM, I have gone ahead and used all possible
> configuration parameters, but still no luck.
>
> It would be great if you could shed some light on this as to what could be
> causing this problem.
>
> *Streams Config While Using 0.11.0.0 *
>
> val props = Properties()
> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> EngineConfig.APPLICATION_ID)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> EngineConfig.KAFKA_SERVERS)
> props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> POLL_RECORDS_CONFIG),
> 1000)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> POLL_INTERVAL_MS_CONFIG),
> Int.MAX_VALUE)
>
> streams = KafkaStreams(builder, StreamsConfig(props))
> streams.start()
>
>
> *Streams Config While Using 0.11.0.1*
>
> val props = Properties()
> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> EngineConfig.APPLICATION_ID)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> EngineConfig.KAFKA_SERVERS)
> props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2)
>
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTB
> EAT_INTERVAL_MS_CONFIG),
> 1)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_
> COMMIT_INTERVAL_MS_CONFIG),
> 1)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE
> _AUTO_COMMIT_CONFIG),
> true)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> POLL_RECORDS_CONFIG),
> 1)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> POLL_INTERVAL_MS_CONFIG),
> Int.MAX_VALUE)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSIO
> N_TIMEOUT_MS_CONFIG),
> 3)
>
> streams = KafkaStreams(builder, StreamsConfig(props))
> streams.start()
>
>
> Thanks,
> Tony
>
> On Thu, Nov 2, 2017 at 4:39 PM, Tony John 
> wrote:
>
> > Hi All,
> >
> > I am facing CommitFailedException in my streams application. As per the
> > log I tried changing the max.poll.interval.ms and max.poll.records. But
> > both didn't help. PFA the full stack trace of the exception and below is
> > the streams configuration used. What else could be wrong?
> >
> > val props = Properties()
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> EngineConfig.APPLICATION_ID)
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> EngineConfig.KAFKA_SERVERS)
> > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
> > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
> > 

Re: Kafka Streams : CommitFailedException

2017-11-07 Thread Tony John
Hi Guozang,

Thanks for looking into this. I was using 0.11.0.0 version of the library
earlier when I was getting the CommitFailed exception and the tasks were
terminating. The application config then was Replication Factor = 2, Num
Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll
Interval = 2147483647. The streams config code (*Streams Config While Using
0.11.0.0*) is given below and the logs of the application while using
0.11.0.0 can be downloaded from
https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0

I have upgraded the libraries to 0.11.0.1 and ran into some other issues.
Though the CommitFailed error logs are still showing up with 0.11.0.1 the
tasks are not getting terminated, but the app quickly runs out of memory
(GC overhead limit exceeded) and the CPU is choked, which was not the case
earlier. The logs are available @
https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and
application config is also given below (*Streams Config While Using
0.11.0.1*).  Since I am not sure what configuration helps reduce the
CommitFailed error, which I think could be one of the reasons for the CPU
choke and eventually cause an OOM, I have gone ahead and used all possible
configuration parameters, but still no luck.

It would be great if you could shed some light on this as to what could be
causing this problem.

*Streams Config While Using 0.11.0.0 *

val props = Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS)
props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
1000)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
Int.MAX_VALUE)

streams = KafkaStreams(builder, StreamsConfig(props))
streams.start()


*Streams Config While Using 0.11.0.1*

val props = Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS)
props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2)

props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
1)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
1)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
true)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
1)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
Int.MAX_VALUE)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
3)

streams = KafkaStreams(builder, StreamsConfig(props))
streams.start()


Thanks,
Tony

On Thu, Nov 2, 2017 at 4:39 PM, Tony John  wrote:

> Hi All,
>
> I am facing CommitFailedException in my streams application. As per the
> log I tried changing the max.poll.interval.ms and max.poll.records. But
> both didn't help. PFA the full stack trace of the exception and below is
> the streams configuration used. What else could be wrong?
>
> val props = Properties()
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS)
> props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
>  1000)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
>  Int.MAX_VALUE)
>
> streams = KafkaStreams(builder, StreamsConfig(props))
> streams.start()
>
>
> Thanks,
> Tony
>


Re: Kafka Streams : CommitFailedException

2017-11-03 Thread Guozhang Wang
Hello Tony,

Which version of Kafka did you use for the Streams library? And could you
share your settings (num. threads in total, num of tasks etc) and your
exception stack trace so that we can help you better identifying the root
cause of the exception?


Guozhang

On Thu, Nov 2, 2017 at 4:09 AM, Tony John  wrote:

> Hi All,
>
> I am facing CommitFailedException in my streams application. As per the
> log I tried changing the max.poll.interval.ms and max.poll.records. But
> both didn't help. PFA the full stack trace of the exception and below is
> the streams configuration used. What else could be wrong?
>
> val props = Properties()
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS)
> props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
>  1000)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
>  Int.MAX_VALUE)
>
> streams = KafkaStreams(builder, StreamsConfig(props))
> streams.start()
>
>
> Thanks,
> Tony
>



-- 
-- Guozhang


Kafka Streams : CommitFailedException

2017-11-02 Thread Tony John
Hi All,

I am facing CommitFailedException in my streams application. As per the log
I tried changing the max.poll.interval.ms and max.poll.records. But both
didn't help. PFA the full stack trace of the exception and below is the
streams configuration used. What else could be wrong?

val props = Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS)
props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
1000)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
Int.MAX_VALUE)

streams = KafkaStreams(builder, StreamsConfig(props))
streams.start()


Thanks,
Tony