KAFKA-5167 and KAFKA-5152
Mind trying out release and see whether the problem persists ?
> >
bq. 1)  Reduced  MAX_POLL_RECORDS_CONFIG to 5000  (previously 50000)
> > >
Was there a typo ? (the two parameters in your email had the same name)
> > >
Is it possible that you were hitting KAFKA-5397 ?
> > >
I have repeated my tests with slight configuration change. The current
logs captured for "StreamThread"  keyword has more relevant logs when
compared to logs which i shared previously. I started the application
instances 100,101 and 102 simultaneously with below configuration
> > >>
1)  Reduced  MAX_POLL_RECORDS_CONFIG to 5000  (previously 50000)
2) Reduced MAX_POLL_RECORDS_CONFIG =60000 (Ipreviously
nteger.MAXVALUE)
> > >>
When the application started all three instances started processing
first few minutes everything went well. After that I could see that
"StreamThread100" error consumer was going for a toss and it started
closing and creating the consumers for a while exactly with the
logs I mentioned in my previous email and after some time I could see
> > that
" StreamThread100" stopped processing messages with below exception
other two continued processing messages without any issues.
> > >>
org.apache.kafka.clients.consumer.CommitFailedException: Commit
cannot
be completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between
> > subsequent
calls to poll() was longer than the configured max.poll.interval.ms,
which typically implies that the poll loop is spending too much time
message processing. You can address this either by increasing the
> > session
timeout or by reducing the maximum size of batches returned in poll()
> > with
max.poll.records.
> > >>
I think since the consumers were starting and stopping there was no
made form the system . Since I reduced Reduced MAX_POLL_RECORDS_CONFIG
=60000  and the processors were getting closed and started which might
> > have
resulted in the   "CommitFailedException due to non avialability of
processing processors.
> > >>
After some time the issue got propagated to other servers, I have
attached the relevant logs with this mail Kindly go through this and
know how I can  solve this issue ?
What should I be looking in broker logs ? I haven't looked at the
> > broker
side since my spark application processing from the same topic with a
different group id is able to process well.
> > >>>
I am fairly new to Kafka Streams . I have deployed two instances of
Kafka 0.11 brokers on AWS M3.Xlarge insatnces. I have created a
> > topic with
36 partitions .and speperate application writes to this topic and
produces records at the rate of 10000 messages per second. I have
> > threes
instances of AWS  M4.xlarge instance  where my Kafka streams
> > application is
running which consumes these messages produced by the other
> > application.
The application  starts up fine working fine and its processing
> > messages on
the first instance,  but when I start the same application on other
instances it is not starting even though the process is alive it is
> > not
processing messages.Also I could see the other instances takes a
> > long time
to start .
> > >>>>>
Apart from first instance,  other instances I could see the
> consumer
getting added and removed repeatedly and I couldn't see any message
processing at all . I have attached the detailed logs where this
> > behavior
is observed.
> > >>>>>
Consumer is getting started with below log in these instances and
getting stopped with below log (* detailed logs attached *)
> > >>>>>
INFO  | 21:59:30 | consumer.ConsumerConfig
(AbstractConfig.java:223)
> > -
ConsumerConfig values:
> > >>>>>     auto.commit.interval.ms = 5000
> > >>>>>     auto.offset.reset = latest
> > >>>>>     bootstrap.servers = [l-mykafkainstancekafka5101:9092,
> > >>>>> l-mykafkainstancekafka5102:9092]
> > >>>>>     check.crcs = true
> > >>>>>     client.id =
> > >>>>>     connections.max.idle.ms = 540000
> > >>>>>     enable.auto.commit = false
> > >>>>>     exclude.internal.topics = true
> > >>>>>     fetch.max.bytes = 52428800
> > >>>>>     fetch.max.wait.ms = 500
> > >>>>>     fetch.min.bytes = 1
> > >>>>>     group.id = myKafka-kafkareplica101Sept08
> > >>>>>     heartbeat.interval.ms = 3000
> > >>>>>     interceptor.classes = null
> > >>>>>     internal.leave.group.on.close = true
> > >>>>>     isolation.level = read_uncommitted
key.deserializer = class mx.july.jmx.proximity.kafka.Ka
> > >>>>> fkaKryoCodec
> > >>>>>     max.partition.fetch.bytes = 1048576
> > >>>>>     max.poll.interval.ms = 300000
> > >>>>>     max.poll.records = 500
> > >>>>>     metadata.max.age.ms = 300000
> > >>>>>     metric.reporters = []
> > >>>>>     metrics.num.samples = 2
> > >>>>>     metrics.recording.level = INFO
> > >>>>>     metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
> > >>>>> org.apache.kafka.clients.consumer.RangeAssignor]
> > >>>>>     receive.buffer.bytes = 65536
> > >>>>>     reconnect.backoff.max.ms = 1000
> > >>>>>     reconnect.backoff.ms = 50
> > >>>>>     request.timeout.ms = 305000
> > >>>>>     retry.backoff.ms = 100
> > >>>>>     sasl.jaas.config = null
> > >>>>>     sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > >>>>>     sasl.kerberos.min.time.before.relogin = 60000
> > >>>>>     sasl.kerberos.service.name = null
> > >>>>>     sasl.kerberos.ticket.renew.jitter = 0.05
> > >>>>>     sasl.kerberos.ticket.renew.window.factor = 0.8
> > >>>>>     sasl.mechanism = GSSAPI
> > >>>>>     security.protocol = PLAINTEXT
> > >>>>>     send.buffer.bytes = 131072
> > >>>>>     session.timeout.ms = 10000
> > >>>>>     ssl.cipher.suites = null
> > >>>>>     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > >>>>>     ssl.endpoint.identification.algorithm = null
> > >>>>>     ssl.key.password = null
> > >>>>>     ssl.keymanager.algorithm = SunX509
> > >>>>>     ssl.keystore.location = null
> > >>>>>     ssl.keystore.password = null
> > >>>>>     ssl.keystore.type = JKS
> > >>>>>     ssl.protocol = TLS
> > >>>>>     ssl.provider = null
> > >>>>>     ssl.secure.random.implementation = null
> > >>>>>     ssl.trustmanager.algorithm = PKIX
> > >>>>>     ssl.truststore.location = null
> > >>>>>     ssl.truststore.password = null
> > >>>>>     ssl.truststore.type = JKS
value.deserializer = class my.dev.MessageUpdateCodec
DEBUG | 21:59:30 | consumer.KafkaConsumer
(KafkaConsumer.java:1617) -
The Kafka consumer has closed. and the whole process repeats.
> > >>>>>
Below you can find my startup code for kafkastreams and the
> > parameters
which I have configured for starting the kafkastreams application .
> > >>>>>
> > >>>>>         private static Properties settings = new Properties();
> > >>>>>         settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > >>>>> "mykafkastreamsapplication");
> > >>>>>         settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"
> > >>>>> ");
> > >>>>>         settings.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"
> > >>>>> 000");
> > >>>>>         settings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"
30000
> > >>>>> ");
> > >>>>>         settings.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> > >>>>> ger.MAX_VALUE);
> > >>>>>         settings.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
> > "10000");
> > >>>>>         settings.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_
> > >>>>> 60000");
> > >>>>>         KStreamBuilder builder = new KStreamBuilder();
> > >>>>>         KafkaStreams streams = new KafkaStreams(builder, settings);
> > >>>>>         builder.addSource(.....
> > >>>>>          .addProcessor  .............
> > >>>>>          .addProcessor  ........
> > >>>>>          .addStateStore(...............
> > >>>>> processor")
> > >>>>>          .addSink ..............
> > >>>>>          . addSink ..............
> > >>>>>           streams.start();
and I am using a Simple  processor to process my logic ..
> > >>>>>
public class InfoProcessor extends AbstractProcessor<Key, Update> {
private static Logger logger = Logger.getLogger(
> > InfoProcessor.class);
> > >>>>> private ProcessorContext context;
> > >>>>> private KeyValueStore<Key, Info> infoStore;
> > >>>>>
> > >>>>> @Override
> > >>>>> @SuppressWarnings("unchecked")
> > >>>>> public void init(ProcessorContext context) {
> > >>>>>     this.context = context;
> > >>>>>     this.context.schedule(Constants.BATCH_DURATION_SECONDS *
> > >>>>>     infoStore = (KeyValueStore<Key, Info>)
> > >>>>> context.getStateStore("InfoStore");
> > >>>>> }
> > >>>>>
> > >>>>> @Override
> > >>>>> public void process(Key key, Update update) {
> > >>>>>     try {
> > >>>>>         if (key != null && update != null) {
> > >>>>>             Info info = infoStore.get(key);
> > >>>>>             // merge logic
> > >>>>>             infoStore.put(key, info);
> > >>>>>         }
> > >>>>>
> > >>>>>     } catch (Exception e) {
> > >>>>>         logger.error(e.getMessage(), e);
> > >>>>>     } finally {
> > >>>>>     }
> > >>>>>     context.commit();
> > >>>>> }
> > >>>>>
> > >>>>> @Override
> > >>>>> public void punctuate(long timestamp) {
> > >>>>>     try {
> > >>>>>         KeyValueIterator<Key, Info> iter = this.infoStore.all();
> > >>>>>         while (iter.hasNext()) {
> > >>>>>             // processing logic
> > >>>>>
> > >>>>>         }
> > >>>>>         iter.close();
> > >>>>>         context.commit();
> > >>>>>     } catch (Exception e) {
> > >>>>>         logger.error(e.getMessage(), e);
> > >>>>>     }
> > >>>>> }
> > >>>>>
