Re: Kafka Streams : CommitFailedException
Hello Guozhang, Thanks for the analysis. Figured out the reason for the OOM and it was actually caused by an in memory queue in the app itself. I have fixed it and right now it all looks good. Sorry for the inconvenience and thanks for helping out. Thanks, Tony On Wed, Nov 8, 2017 at 1:20 AM, Guozhang Wangwrote: > Hello Tony, > > You mentioned in 0.11.0.0 the > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, > 1) while in 0.11.0.1 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, > 2). > But from your logs it seems you set this config as 2 in both versions. > Right? > > Anyways, I took a look into your logs and I think you are hitting a known > issue (https://issues.apache.org/jira/browse/KAFKA-5152) that has been > fixed in 0.11.0.0; that is why you only see the WARN log entry in 0.11.0.1 > but the app is not dying out. The running out of memory issues seems not > related to the CommitFailed error. Do you have any stateful operations in > your app that use an iterator? Did you close the iterator after complete > using it? > > > Guozhang > > > On Tue, Nov 7, 2017 at 12:42 AM, Tony John > wrote: > > > Hi Guozang, > > > > Thanks for looking into this. I was using 0.11.0.0 version of the library > > earlier when I was getting the CommitFailed exception and the tasks were > > terminating. The application config then was Replication Factor = 2, Num > > Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll > > Interval = 2147483647. The streams config code (*Streams Config While > > Using > > 0.11.0.0*) is given below and the logs of the application while using > > 0.11.0.0 can be downloaded from > > https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0 > > > > I have upgraded the libraries to 0.11.0.1 and ran into some other issues. > > Though the CommitFailed error logs are still showing up with 0.11.0.1 the > > tasks are not getting terminated, but the app quickly runs out of memory > > (GC overhead limit exceeded) and the CPU is choked, which was not the > case > > earlier. The logs are available @ > > https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and > > application config is also given below (*Streams Config While Using > > 0.11.0.1*). Since I am not sure what configuration helps reduce the > > CommitFailed error, which I think could be one of the reasons for the CPU > > choke and eventually cause an OOM, I have gone ahead and used all > possible > > configuration parameters, but still no luck. > > > > It would be great if you could shed some light on this as to what could > be > > causing this problem. > > > > *Streams Config While Using 0.11.0.0 * > > > > val props = Properties() > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > > EngineConfig.APPLICATION_ID) > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > EngineConfig.KAFKA_SERVERS) > > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) > > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) > > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) > > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > > POLL_RECORDS_CONFIG), > > 1000) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > > POLL_INTERVAL_MS_CONFIG), > > Int.MAX_VALUE) > > > > streams = KafkaStreams(builder, StreamsConfig(props)) > > streams.start() > > > > > > *Streams Config While Using 0.11.0.1* > > > > val props = Properties() > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > > EngineConfig.APPLICATION_ID) > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > EngineConfig.KAFKA_SERVERS) > > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) > > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) > > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2) > > > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTB > > EAT_INTERVAL_MS_CONFIG), > > 1) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_ > > COMMIT_INTERVAL_MS_CONFIG), > > 1) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE > > _AUTO_COMMIT_CONFIG), > > true) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > > POLL_RECORDS_CONFIG), > > 1) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > > POLL_INTERVAL_MS_CONFIG), > > Int.MAX_VALUE) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSIO > > N_TIMEOUT_MS_CONFIG), > > 3) > > > > streams = KafkaStreams(builder, StreamsConfig(props)) > > streams.start() > > > > > > Thanks, > > Tony > > > > On Thu, Nov 2, 2017 at 4:39 PM, Tony John > > wrote: > > > > > Hi All, > > > > > > I am facing CommitFailedException in my streams application. As per the > > > log I tried changing the max.poll.interval.ms and max.poll.records. > But > > > both didn't help. PFA the full stack trace of the exception and
Re: Kafka Streams : CommitFailedException
Hello Tony, You mentioned in 0.11.0.0 the props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) while in 0.11.0.1 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2). But from your logs it seems you set this config as 2 in both versions. Right? Anyways, I took a look into your logs and I think you are hitting a known issue (https://issues.apache.org/jira/browse/KAFKA-5152) that has been fixed in 0.11.0.0; that is why you only see the WARN log entry in 0.11.0.1 but the app is not dying out. The running out of memory issues seems not related to the CommitFailed error. Do you have any stateful operations in your app that use an iterator? Did you close the iterator after complete using it? Guozhang On Tue, Nov 7, 2017 at 12:42 AM, Tony Johnwrote: > Hi Guozang, > > Thanks for looking into this. I was using 0.11.0.0 version of the library > earlier when I was getting the CommitFailed exception and the tasks were > terminating. The application config then was Replication Factor = 2, Num > Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll > Interval = 2147483647. The streams config code (*Streams Config While > Using > 0.11.0.0*) is given below and the logs of the application while using > 0.11.0.0 can be downloaded from > https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0 > > I have upgraded the libraries to 0.11.0.1 and ran into some other issues. > Though the CommitFailed error logs are still showing up with 0.11.0.1 the > tasks are not getting terminated, but the app quickly runs out of memory > (GC overhead limit exceeded) and the CPU is choked, which was not the case > earlier. The logs are available @ > https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and > application config is also given below (*Streams Config While Using > 0.11.0.1*). Since I am not sure what configuration helps reduce the > CommitFailed error, which I think could be one of the reasons for the CPU > choke and eventually cause an OOM, I have gone ahead and used all possible > configuration parameters, but still no luck. > > It would be great if you could shed some light on this as to what could be > causing this problem. > > *Streams Config While Using 0.11.0.0 * > > val props = Properties() > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > EngineConfig.APPLICATION_ID) > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > EngineConfig.KAFKA_SERVERS) > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > POLL_RECORDS_CONFIG), > 1000) > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > POLL_INTERVAL_MS_CONFIG), > Int.MAX_VALUE) > > streams = KafkaStreams(builder, StreamsConfig(props)) > streams.start() > > > *Streams Config While Using 0.11.0.1* > > val props = Properties() > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > EngineConfig.APPLICATION_ID) > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > EngineConfig.KAFKA_SERVERS) > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTB > EAT_INTERVAL_MS_CONFIG), > 1) > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_ > COMMIT_INTERVAL_MS_CONFIG), > 1) > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE > _AUTO_COMMIT_CONFIG), > true) > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > POLL_RECORDS_CONFIG), > 1) > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > POLL_INTERVAL_MS_CONFIG), > Int.MAX_VALUE) > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSIO > N_TIMEOUT_MS_CONFIG), > 3) > > streams = KafkaStreams(builder, StreamsConfig(props)) > streams.start() > > > Thanks, > Tony > > On Thu, Nov 2, 2017 at 4:39 PM, Tony John > wrote: > > > Hi All, > > > > I am facing CommitFailedException in my streams application. As per the > > log I tried changing the max.poll.interval.ms and max.poll.records. But > > both didn't help. PFA the full stack trace of the exception and below is > > the streams configuration used. What else could be wrong? > > > > val props = Properties() > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > EngineConfig.APPLICATION_ID) > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > EngineConfig.KAFKA_SERVERS) > > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) > > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) > > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) > > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") > >
Re: Kafka Streams : CommitFailedException
Hi Guozang, Thanks for looking into this. I was using 0.11.0.0 version of the library earlier when I was getting the CommitFailed exception and the tasks were terminating. The application config then was Replication Factor = 2, Num Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll Interval = 2147483647. The streams config code (*Streams Config While Using 0.11.0.0*) is given below and the logs of the application while using 0.11.0.0 can be downloaded from https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0 I have upgraded the libraries to 0.11.0.1 and ran into some other issues. Though the CommitFailed error logs are still showing up with 0.11.0.1 the tasks are not getting terminated, but the app quickly runs out of memory (GC overhead limit exceeded) and the CPU is choked, which was not the case earlier. The logs are available @ https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and application config is also given below (*Streams Config While Using 0.11.0.1*). Since I am not sure what configuration helps reduce the CommitFailed error, which I think could be one of the reasons for the CPU choke and eventually cause an OOM, I have gone ahead and used all possible configuration parameters, but still no luck. It would be great if you could shed some light on this as to what could be causing this problem. *Streams Config While Using 0.11.0.0 * val props = Properties() props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID) props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS) props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1000) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), Int.MAX_VALUE) streams = KafkaStreams(builder, StreamsConfig(props)) streams.start() *Streams Config While Using 0.11.0.1* val props = Properties() props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID) props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS) props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), 1) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), 1) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), true) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), Int.MAX_VALUE) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 3) streams = KafkaStreams(builder, StreamsConfig(props)) streams.start() Thanks, Tony On Thu, Nov 2, 2017 at 4:39 PM, Tony Johnwrote: > Hi All, > > I am facing CommitFailedException in my streams application. As per the > log I tried changing the max.poll.interval.ms and max.poll.records. But > both didn't help. PFA the full stack trace of the exception and below is > the streams configuration used. What else could be wrong? > > val props = Properties() > props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID) > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS) > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), > 1000) > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), > Int.MAX_VALUE) > > streams = KafkaStreams(builder, StreamsConfig(props)) > streams.start() > > > Thanks, > Tony >
Re: Kafka Streams : CommitFailedException
Hello Tony, Which version of Kafka did you use for the Streams library? And could you share your settings (num. threads in total, num of tasks etc) and your exception stack trace so that we can help you better identifying the root cause of the exception? Guozhang On Thu, Nov 2, 2017 at 4:09 AM, Tony Johnwrote: > Hi All, > > I am facing CommitFailedException in my streams application. As per the > log I tried changing the max.poll.interval.ms and max.poll.records. But > both didn't help. PFA the full stack trace of the exception and below is > the streams configuration used. What else could be wrong? > > val props = Properties() > props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID) > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS) > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), > 1000) > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), > Int.MAX_VALUE) > > streams = KafkaStreams(builder, StreamsConfig(props)) > streams.start() > > > Thanks, > Tony > -- -- Guozhang
Kafka Streams : CommitFailedException
Hi All, I am facing CommitFailedException in my streams application. As per the log I tried changing the max.poll.interval.ms and max.poll.records. But both didn't help. PFA the full stack trace of the exception and below is the streams configuration used. What else could be wrong? val props = Properties() props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID) props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS) props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1000) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), Int.MAX_VALUE) streams = KafkaStreams(builder, StreamsConfig(props)) streams.start() Thanks, Tony