Hello Guozhang, Thanks for the analysis. Figured out the reason for the OOM and it was actually caused by an in memory queue in the app itself. I have fixed it and right now it all looks good. Sorry for the inconvenience and thanks for helping out.
Thanks, Tony On Wed, Nov 8, 2017 at 1:20 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Tony, > > You mentioned in 0.11.0.0 the > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, > 1) while in 0.11.0.1 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, > 2). > But from your logs it seems you set this config as 2 in both versions. > Right? > > Anyways, I took a look into your logs and I think you are hitting a known > issue (https://issues.apache.org/jira/browse/KAFKA-5152) that has been > fixed in 0.11.0.0; that is why you only see the WARN log entry in 0.11.0.1 > but the app is not dying out. The running out of memory issues seems not > related to the CommitFailed error. Do you have any stateful operations in > your app that use an iterator? Did you close the iterator after complete > using it? > > > Guozhang > > > On Tue, Nov 7, 2017 at 12:42 AM, Tony John <tonyjohnant...@gmail.com> > wrote: > > > Hi Guozang, > > > > Thanks for looking into this. I was using 0.11.0.0 version of the library > > earlier when I was getting the CommitFailed exception and the tasks were > > terminating. The application config then was Replication Factor = 2, Num > > Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll > > Interval = 2147483647. The streams config code (*Streams Config While > > Using > > 0.11.0.0*) is given below and the logs of the application while using > > 0.11.0.0 can be downloaded from > > https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0 > > > > I have upgraded the libraries to 0.11.0.1 and ran into some other issues. > > Though the CommitFailed error logs are still showing up with 0.11.0.1 the > > tasks are not getting terminated, but the app quickly runs out of memory > > (GC overhead limit exceeded) and the CPU is choked, which was not the > case > > earlier. The logs are available @ > > https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and > > application config is also given below (*Streams Config While Using > > 0.11.0.1*). Since I am not sure what configuration helps reduce the > > CommitFailed error, which I think could be one of the reasons for the CPU > > choke and eventually cause an OOM, I have gone ahead and used all > possible > > configuration parameters, but still no luck. > > > > It would be great if you could shed some light on this as to what could > be > > causing this problem. > > > > *Streams Config While Using 0.11.0.0 * > > > > val props = Properties() > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > > EngineConfig.APPLICATION_ID) > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > EngineConfig.KAFKA_SERVERS) > > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) > > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) > > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) > > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > > POLL_RECORDS_CONFIG), > > 1000) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > > POLL_INTERVAL_MS_CONFIG), > > Int.MAX_VALUE) > > > > streams = KafkaStreams(builder, StreamsConfig(props)) > > streams.start() > > > > > > *Streams Config While Using 0.11.0.1* > > > > val props = Properties() > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > > EngineConfig.APPLICATION_ID) > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > EngineConfig.KAFKA_SERVERS) > > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) > > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) > > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2) > > > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTB > > EAT_INTERVAL_MS_CONFIG), > > 10000) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_ > > COMMIT_INTERVAL_MS_CONFIG), > > 10000) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE > > _AUTO_COMMIT_CONFIG), > > true) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > > POLL_RECORDS_CONFIG), > > 10000) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > > POLL_INTERVAL_MS_CONFIG), > > Int.MAX_VALUE) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSIO > > N_TIMEOUT_MS_CONFIG), > > 30000) > > > > streams = KafkaStreams(builder, StreamsConfig(props)) > > streams.start() > > > > > > Thanks, > > Tony > > > > On Thu, Nov 2, 2017 at 4:39 PM, Tony John <tonyjohnant...@gmail.com> > > wrote: > > > > > Hi All, > > > > > > I am facing CommitFailedException in my streams application. As per the > > > log I tried changing the max.poll.interval.ms and max.poll.records. > But > > > both didn't help. PFA the full stack trace of the exception and below > is > > > the streams configuration used. What else could be wrong? > > > > > > val props = Properties() > > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > > EngineConfig.APPLICATION_ID) > > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > EngineConfig.KAFKA_SERVERS) > > > props.put(StreamsConfig.STATE_DIR_CONFIG, > EngineConfig.STATE_STORE_DIR) > > > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) > > > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) > > > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") > > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig. > MAX_POLL_RECORDS_CONFIG), > > 1000) > > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig. > MAX_POLL_INTERVAL_MS_CONFIG), > > Int.MAX_VALUE) > > > > > > streams = KafkaStreams(builder, StreamsConfig(props)) > > > streams.start() > > > > > > > > > Thanks, > > > Tony > > > > > > > > > -- > -- Guozhang >