Hi All ,

@Bill,

I will reduce the MAX_POLL_RECORDS to 500/1000 and I will share the results
shortly.

@Ted,

Yes I reduced MAX_POLL_RECORDS_CONFIG from 50000 to 5000 . It was not a
typo . Do you think 50000 is way too high for an kafkaStreams Application
?  My Spark application which I was trying to replace with kafka streams
was processing 250000 messages per 5 second batch, That was the reason I
set 50000 records for MAX_POLL_RECORDS_CONFIG .

I don't think I have hit  KAFKA-5397since I couldn't find any instance
"org.apache.kafka.streams.errors.LockException"
in my logs. I could see below exception , but these exceptions  are
triggered long after the application stopped consuming any messages .

StreamThread100.log:org.apache.kafka.streams.errors.StreamsException:
stream-thread failed to suspend stream tasks
User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
for group myKafka-kafkareplica101Sept08 failed on partition revocation

@Damian , I figured out the  pattern below, I don't know whether it helps.
The streamthread logs which I shared, Did it help?. I


I couldn't figure of the reason why the consumers are getting closed while
its getting allocated and suddenly stopped without any reason. If you look
at the pattern

 1) Consumer is getting Created with Config Values Supplied by the
Application.
 2) Adding Sensors
 3) Fetching API Version and Initiating Connections to Kafka Brokers
(NetworkClient.java)
 4)  Sending metadata request and Recorded API Version From Broker
(NetworkClient.java)
5)  Updated cluster metadata version (Some Incremental verison ) to Cluster
6)  Discovered coordinator (One of the kafka Brokers)
7) Initiating connection to coordinator  (One of the kafka Brokers)
8) fetching committed offsets for partitions ( ConsumerCoordinator.java:826)
9)  Recorded API versions for node
10)  Resetting offset for partition( for all the partitions)
11) Handling ListOffsetResponse ( for all the partitions)
12) Removing Sensors ( I couldn't see any exceptions though)
13) consumer.KafkaConsumer (KafkaConsumer.java:1617) - The Kafka consumer
has closed. (
I couldn't see any exceptions though, I couldn't figure out the reason
why KafkaConsumer
Closed , no reasons provided in the logs)
14) The kafka streams application goes back o step 1 and creates the
consumer again.( the whole process is repeated but the application 90% of
the time never recovers)
15) Since I have introduced the limit for MAX_POLL_RECORDS_CONFIG =60000
(previouslyI nteger.MAXVALUE), I could See below exception after 60 seconds
of consumer retry.
 org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalancedAfter

I am not sure what could be wrong on my side and how I can resolve this
issue so that my application start processing messages consistently.

Thanks
Dev

On Sat, Sep 16, 2017 at 2:02 AM, Bill Bejeck <b...@confluent.io> wrote:

> Hi,
>
> Could you set MAX_POLL_RECORDS to something lower like 500 or 1000 and try
> again?
>
> Thanks,
> Bill
>
> On Fri, Sep 15, 2017 at 3:40 PM, dev loper <spark...@gmail.com> wrote:
>
> > Hi Damian,
> >
> > I have repeated my tests with slight configuration change. The current
> > logs captured for "StreamThread"  keyword has more relevant logs when
> > compared to logs which i shared previously. I started the application on
> > instances 100,101 and 102 simultaneously with below configuration
> >
> > 1)  Reduced  MAX_POLL_RECORDS_CONFIG to 5000  (previously 50000)
> >  2) Reduced MAX_POLL_RECORDS_CONFIG =60000 (Ipreviously nteger.MAXVALUE)
> >
> > When the application started all three instances started processing for
> > first few minutes everything went well. After that I could see that
> > "StreamThread100" error consumer was going for a toss and it started
> > closing and creating the consumers for a while exactly with the pattern
> of
> > logs I mentioned in my previous email and after some time I could see
> that
> > " StreamThread100" stopped processing messages with below exception and
> the
> > other two continued processing messages without any issues.
> >
> > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
> be
> > completed since the group has already rebalanced and assigned the
> > partitions to another member. This means that the time between subsequent
> > calls to poll() was longer than the configured max.poll.interval.ms,
> > which typically implies that the poll loop is spending too much time
> > message processing. You can address this either by increasing the session
> > timeout or by reducing the maximum size of batches returned in poll()
> with
> > max.poll.records.
> >
> > I think since the consumers were starting and stopping there was no poll
> > made form the system . Since I reduced Reduced MAX_POLL_RECORDS_CONFIG
> > =60000  and the processors were getting closed and started which might
> have
> > resulted in the   "CommitFailedException due to non avialability of
> > processing processors.
> >
> > After some time the issue got propagated to other servers, I have
> attached
> > the relevant logs with this mail Kindly go through this and let me know
> how
> > I can  solve this issue ?
> >
> >
> >
> >
> > <https://mail.google.com/mail/?ui=2&ik=6aa5d30a60&view=att&;
> th=15e8701650e040b7&attid=0.3&disp=safe&realattid=f_j7m9sld82&zw>
> >
> > On Fri, Sep 15, 2017 at 10:33 PM, dev loper <spark...@gmail.com> wrote:
> >
> >> Hi Ted,
> >>
> >> What should I be looking in broker logs ? I haven't looked at the broker
> >> side since my spark application processing from the same topic with a
> >> different group id is able to process well.
> >>
> >> On Fri, Sep 15, 2017 at 3:30 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> >>
> >>> Is there some clue in broker logs ?
> >>>
> >>> Thanks
> >>>
> >>> On Thu, Sep 14, 2017 at 11:19 PM, dev loper <spark...@gmail.com>
> wrote:
> >>>
> >>>> Dear Kafka Users,
> >>>>
> >>>> I am fairly new to Kafka Streams . I have deployed two instances of
> >>>> Kafka 0.11 brokers on AWS M3.Xlarge insatnces. I have created a topic
> with
> >>>> 36 partitions .and speperate application writes to this topic and it
> >>>> produces records at the rate of 10000 messages per second. I have
> threes
> >>>> instances of AWS  M4.xlarge instance  where my Kafka streams
> application is
> >>>> running which consumes these messages produced by the other
> application.
> >>>> The application  starts up fine working fine and its processing
> messages on
> >>>> the first instance,  but when I start the same application on other
> >>>> instances it is not starting even though the process is alive it is
> not
> >>>> processing messages.Also I could see the other instances takes a long
> time
> >>>> to start .
> >>>>
> >>>> Apart from first instance,  other instances I could see the consumer
> >>>> getting added and removed repeatedly and I couldn't see any message
> >>>> processing at all . I have attached the detailed logs where this
> behavior
> >>>> is observed.
> >>>>
> >>>> Consumer is getting started with below log in these instances and
> >>>> getting stopped with below log (* detailed logs attached *)
> >>>>
> >>>> INFO  | 21:59:30 | consumer.ConsumerConfig (AbstractConfig.java:223) -
> >>>> ConsumerConfig values:
> >>>>     auto.commit.interval.ms = 5000
> >>>>     auto.offset.reset = latest
> >>>>     bootstrap.servers = [l-mykafkainstancekafka5101:9092,
> >>>> l-mykafkainstancekafka5102:9092]
> >>>>     check.crcs = true
> >>>>     client.id =
> >>>>     connections.max.idle.ms = 540000
> >>>>     enable.auto.commit = false
> >>>>     exclude.internal.topics = true
> >>>>     fetch.max.bytes = 52428800
> >>>>     fetch.max.wait.ms = 500
> >>>>     fetch.min.bytes = 1
> >>>>     group.id = myKafka-kafkareplica101Sept08
> >>>>     heartbeat.interval.ms = 3000
> >>>>     interceptor.classes = null
> >>>>     internal.leave.group.on.close = true
> >>>>     isolation.level = read_uncommitted
> >>>>     key.deserializer = class mx.july.jmx.proximity.kafka.
> KafkaKryoCodec
> >>>>     max.partition.fetch.bytes = 1048576
> >>>>     max.poll.interval.ms = 300000
> >>>>     max.poll.records = 500
> >>>>     metadata.max.age.ms = 300000
> >>>>     metric.reporters = []
> >>>>     metrics.num.samples = 2
> >>>>     metrics.recording.level = INFO
> >>>>     metrics.sample.window.ms = 30000
> >>>>     partition.assignment.strategy = [class
> >>>> org.apache.kafka.clients.consumer.RangeAssignor]
> >>>>     receive.buffer.bytes = 65536
> >>>>     reconnect.backoff.max.ms = 1000
> >>>>     reconnect.backoff.ms = 50
> >>>>     request.timeout.ms = 305000
> >>>>     retry.backoff.ms = 100
> >>>>     sasl.jaas.config = null
> >>>>     sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >>>>     sasl.kerberos.min.time.before.relogin = 60000
> >>>>     sasl.kerberos.service.name = null
> >>>>     sasl.kerberos.ticket.renew.jitter = 0.05
> >>>>     sasl.kerberos.ticket.renew.window.factor = 0.8
> >>>>     sasl.mechanism = GSSAPI
> >>>>     security.protocol = PLAINTEXT
> >>>>     send.buffer.bytes = 131072
> >>>>     session.timeout.ms = 10000
> >>>>     ssl.cipher.suites = null
> >>>>     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >>>>     ssl.endpoint.identification.algorithm = null
> >>>>     ssl.key.password = null
> >>>>     ssl.keymanager.algorithm = SunX509
> >>>>     ssl.keystore.location = null
> >>>>     ssl.keystore.password = null
> >>>>     ssl.keystore.type = JKS
> >>>>     ssl.protocol = TLS
> >>>>     ssl.provider = null
> >>>>     ssl.secure.random.implementation = null
> >>>>     ssl.trustmanager.algorithm = PKIX
> >>>>     ssl.truststore.location = null
> >>>>     ssl.truststore.password = null
> >>>>     ssl.truststore.type = JKS
> >>>>     value.deserializer = class my.dev.MessageUpdateCodec
> >>>>
> >>>>
> >>>> DEBUG | 21:59:30 | consumer.KafkaConsumer (KafkaConsumer.java:1617) -
> >>>> The Kafka consumer has closed. and the whole process repeats.
> >>>>
> >>>>
> >>>>
> >>>> Below you can find my startup code for kafkastreams and the parameters
> >>>> which I have configured for starting the kafkastreams application .
> >>>>
> >>>>         private static Properties settings = new Properties();
> >>>>         settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >>>> "mykafkastreamsapplication");
> >>>>         settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"
> latest");
> >>>>         settings.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"10
> >>>> 000");
> >>>>         settings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"
> 30000");
> >>>>         settings.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,Inte
> >>>> ger.MAX_VALUE);
> >>>>         settings.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
> "10000");
> >>>>         settings.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,"
> >>>> 60000");
> >>>>
> >>>>         KStreamBuilder builder = new KStreamBuilder();
> >>>>         KafkaStreams streams = new KafkaStreams(builder, settings);
> >>>>         builder.addSource(.....
> >>>>          .addProcessor  .............
> >>>>          .addProcessor  ........
> >>>>
> >>>>          .addStateStore(...................).persistent().build(),"my
> >>>> processor")
> >>>>          .addSink ..............
> >>>>          . addSink ..............
> >>>>           streams.start();
> >>>>
> >>>> and I am using a Simple  processor to process my logic ..
> >>>>
> >>>> public class InfoProcessor extends AbstractProcessor<Key, Update> {
> >>>> private static Logger logger = Logger.getLogger(InfoProcessor.class);
> >>>> private ProcessorContext context;
> >>>> private KeyValueStore<Key, Info> infoStore;
> >>>>
> >>>> @Override
> >>>> @SuppressWarnings("unchecked")
> >>>> public void init(ProcessorContext context) {
> >>>>     this.context = context;
> >>>>     this.context.schedule(Constants.BATCH_DURATION_SECONDS * 1000);
> >>>>     infoStore = (KeyValueStore<Key, Info>)
> >>>> context.getStateStore("InfoStore");
> >>>> }
> >>>>
> >>>> @Override
> >>>> public void process(Key key, Update update) {
> >>>>     try {
> >>>>         if (key != null && update != null) {
> >>>>             Info info = infoStore.get(key);
> >>>>             // merge logic
> >>>>             infoStore.put(key, info);
> >>>>         }
> >>>>
> >>>>     } catch (Exception e) {
> >>>>         logger.error(e.getMessage(), e);
> >>>>     } finally {
> >>>>     }
> >>>>     context.commit();
> >>>> }
> >>>>
> >>>> @Override
> >>>> public void punctuate(long timestamp) {
> >>>>     try {
> >>>>         KeyValueIterator<Key, Info> iter = this.infoStore.all();
> >>>>         while (iter.hasNext()) {
> >>>>             // processing logic
> >>>>
> >>>>         }
> >>>>         iter.close();
> >>>>         context.commit();
> >>>>     } catch (Exception e) {
> >>>>         logger.error(e.getMessage(), e);
> >>>>     }
> >>>> }
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to