Though there was no LockException in your log, there is still enough
similarity between your log and the one posted by johnchou:

https://issues.apache.org/jira/browse/KAFKA-5397?focusedCommentId=16162689&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16162689

Even the line numbers match:
sendOffsetCommitRequest(ConsumerCoordinator.java:725)

Reducing MAX_POLL_RECORDS_CONFIG may lessen the symptom but that would not
meet your throughput goal.

On Fri, Sep 15, 2017 at 8:09 PM, dev loper <spark...@gmail.com> wrote:

> Hi All ,
>
> @Bill,
>
> I will reduce the MAX_POLL_RECORDS to 500/1000 and I will share the
> results shortly.
>
> @Ted,
>
> Yes I reduced MAX_POLL_RECORDS_CONFIG from 50000 to 5000 . It was not a
> typo . Do you think 50000 is way too high for an kafkaStreams Application
> ?  My Spark application which I was trying to replace with kafka streams
> was processing 250000 messages per 5 second batch, That was the reason I
> set 50000 records for MAX_POLL_RECORDS_CONFIG .
>
> I don't think I have hit  KAFKA-5397since I couldn't find any instance "
> org.apache.kafka.streams.errors.LockException"  in my logs. I could see
> below exception , but these exceptions  are triggered long after the
> application stopped consuming any messages .
>
> StreamThread100.log:org.apache.kafka.streams.errors.StreamsException:
> stream-thread failed to suspend stream tasks
> User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
> for group myKafka-kafkareplica101Sept08 failed on partition revocation
>
> @Damian , I figured out the  pattern below, I don't know whether it
> helps.  The streamthread logs which I shared, Did it help?. I
>
>
> I couldn't figure of the reason why the consumers are getting closed while
> its getting allocated and suddenly stopped without any reason. If you look
> at the pattern
>
>  1) Consumer is getting Created with Config Values Supplied by the
> Application.
>  2) Adding Sensors
>  3) Fetching API Version and Initiating Connections to Kafka Brokers
> (NetworkClient.java)
>  4)  Sending metadata request and Recorded API Version From Broker
> (NetworkClient.java)
> 5)  Updated cluster metadata version (Some Incremental verison ) to Cluster
> 6)  Discovered coordinator (One of the kafka Brokers)
> 7) Initiating connection to coordinator  (One of the kafka Brokers)
> 8) fetching committed offsets for partitions (
> ConsumerCoordinator.java:826)
> 9)  Recorded API versions for node
> 10)  Resetting offset for partition( for all the partitions)
> 11) Handling ListOffsetResponse ( for all the partitions)
> 12) Removing Sensors ( I couldn't see any exceptions though)
> 13) consumer.KafkaConsumer (KafkaConsumer.java:1617) - The Kafka consumer
> has closed. (
> I couldn't see any exceptions though, I couldn't figure out the reason why 
> KafkaConsumer
> Closed , no reasons provided in the logs)
> 14) The kafka streams application goes back o step 1 and creates the
> consumer again.( the whole process is repeated but the application 90% of
> the time never recovers)
> 15) Since I have introduced the limit for MAX_POLL_RECORDS_CONFIG =60000
> (previouslyI nteger.MAXVALUE), I could See below exception after 60 seconds
> of consumer retry.
>  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
> be completed since the group has already rebalancedAfter
>
> I am not sure what could be wrong on my side and how I can resolve this
> issue so that my application start processing messages consistently.
>
> Thanks
> Dev
>
> On Sat, Sep 16, 2017 at 2:02 AM, Bill Bejeck <b...@confluent.io> wrote:
>
>> Hi,
>>
>> Could you set MAX_POLL_RECORDS to something lower like 500 or 1000 and try
>> again?
>>
>> Thanks,
>> Bill
>>
>> On Fri, Sep 15, 2017 at 3:40 PM, dev loper <spark...@gmail.com> wrote:
>>
>> > Hi Damian,
>> >
>> > I have repeated my tests with slight configuration change. The current
>> > logs captured for "StreamThread"  keyword has more relevant logs when
>> > compared to logs which i shared previously. I started the application on
>> > instances 100,101 and 102 simultaneously with below configuration
>> >
>> > 1)  Reduced  MAX_POLL_RECORDS_CONFIG to 5000  (previously 50000)
>> >  2) Reduced MAX_POLL_RECORDS_CONFIG =60000 (Ipreviously nteger.MAXVALUE)
>> >
>> > When the application started all three instances started processing for
>> > first few minutes everything went well. After that I could see that
>> > "StreamThread100" error consumer was going for a toss and it started
>> > closing and creating the consumers for a while exactly with the pattern
>> of
>> > logs I mentioned in my previous email and after some time I could see
>> that
>> > " StreamThread100" stopped processing messages with below exception and
>> the
>> > other two continued processing messages without any issues.
>> >
>> > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
>> be
>> > completed since the group has already rebalanced and assigned the
>> > partitions to another member. This means that the time between
>> subsequent
>> > calls to poll() was longer than the configured max.poll.interval.ms,
>> > which typically implies that the poll loop is spending too much time
>> > message processing. You can address this either by increasing the
>> session
>> > timeout or by reducing the maximum size of batches returned in poll()
>> with
>> > max.poll.records.
>> >
>> > I think since the consumers were starting and stopping there was no poll
>> > made form the system . Since I reduced Reduced MAX_POLL_RECORDS_CONFIG
>> > =60000  and the processors were getting closed and started which might
>> have
>> > resulted in the   "CommitFailedException due to non avialability of
>> > processing processors.
>> >
>> > After some time the issue got propagated to other servers, I have
>> attached
>> > the relevant logs with this mail Kindly go through this and let me know
>> how
>> > I can  solve this issue ?
>> >
>> >
>> >
>> >
>> > <https://mail.google.com/mail/?ui=2&ik=6aa5d30a60&view=att&t
>> h=15e8701650e040b7&attid=0.3&disp=safe&realattid=f_j7m9sld82&zw>
>> >
>> > On Fri, Sep 15, 2017 at 10:33 PM, dev loper <spark...@gmail.com> wrote:
>> >
>> >> Hi Ted,
>> >>
>> >> What should I be looking in broker logs ? I haven't looked at the
>> broker
>> >> side since my spark application processing from the same topic with a
>> >> different group id is able to process well.
>> >>
>> >> On Fri, Sep 15, 2017 at 3:30 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>> >>
>> >>> Is there some clue in broker logs ?
>> >>>
>> >>> Thanks
>> >>>
>> >>> On Thu, Sep 14, 2017 at 11:19 PM, dev loper <spark...@gmail.com>
>> wrote:
>> >>>
>> >>>> Dear Kafka Users,
>> >>>>
>> >>>> I am fairly new to Kafka Streams . I have deployed two instances of
>> >>>> Kafka 0.11 brokers on AWS M3.Xlarge insatnces. I have created a
>> topic with
>> >>>> 36 partitions .and speperate application writes to this topic and it
>> >>>> produces records at the rate of 10000 messages per second. I have
>> threes
>> >>>> instances of AWS  M4.xlarge instance  where my Kafka streams
>> application is
>> >>>> running which consumes these messages produced by the other
>> application.
>> >>>> The application  starts up fine working fine and its processing
>> messages on
>> >>>> the first instance,  but when I start the same application on other
>> >>>> instances it is not starting even though the process is alive it is
>> not
>> >>>> processing messages.Also I could see the other instances takes a
>> long time
>> >>>> to start .
>> >>>>
>> >>>> Apart from first instance,  other instances I could see the consumer
>> >>>> getting added and removed repeatedly and I couldn't see any message
>> >>>> processing at all . I have attached the detailed logs where this
>> behavior
>> >>>> is observed.
>> >>>>
>> >>>> Consumer is getting started with below log in these instances and
>> >>>> getting stopped with below log (* detailed logs attached *)
>> >>>>
>> >>>> INFO  | 21:59:30 | consumer.ConsumerConfig (AbstractConfig.java:223)
>> -
>> >>>> ConsumerConfig values:
>> >>>>     auto.commit.interval.ms = 5000
>> >>>>     auto.offset.reset = latest
>> >>>>     bootstrap.servers = [l-mykafkainstancekafka5101:9092,
>> >>>> l-mykafkainstancekafka5102:9092]
>> >>>>     check.crcs = true
>> >>>>     client.id =
>> >>>>     connections.max.idle.ms = 540000
>> >>>>     enable.auto.commit = false
>> >>>>     exclude.internal.topics = true
>> >>>>     fetch.max.bytes = 52428800
>> >>>>     fetch.max.wait.ms = 500
>> >>>>     fetch.min.bytes = 1
>> >>>>     group.id = myKafka-kafkareplica101Sept08
>> >>>>     heartbeat.interval.ms = 3000
>> >>>>     interceptor.classes = null
>> >>>>     internal.leave.group.on.close = true
>> >>>>     isolation.level = read_uncommitted
>> >>>>     key.deserializer = class mx.july.jmx.proximity.kafka.Ka
>> fkaKryoCodec
>> >>>>     max.partition.fetch.bytes = 1048576
>> >>>>     max.poll.interval.ms = 300000
>> >>>>     max.poll.records = 500
>> >>>>     metadata.max.age.ms = 300000
>> >>>>     metric.reporters = []
>> >>>>     metrics.num.samples = 2
>> >>>>     metrics.recording.level = INFO
>> >>>>     metrics.sample.window.ms = 30000
>> >>>>     partition.assignment.strategy = [class
>> >>>> org.apache.kafka.clients.consumer.RangeAssignor]
>> >>>>     receive.buffer.bytes = 65536
>> >>>>     reconnect.backoff.max.ms = 1000
>> >>>>     reconnect.backoff.ms = 50
>> >>>>     request.timeout.ms = 305000
>> >>>>     retry.backoff.ms = 100
>> >>>>     sasl.jaas.config = null
>> >>>>     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> >>>>     sasl.kerberos.min.time.before.relogin = 60000
>> >>>>     sasl.kerberos.service.name = null
>> >>>>     sasl.kerberos.ticket.renew.jitter = 0.05
>> >>>>     sasl.kerberos.ticket.renew.window.factor = 0.8
>> >>>>     sasl.mechanism = GSSAPI
>> >>>>     security.protocol = PLAINTEXT
>> >>>>     send.buffer.bytes = 131072
>> >>>>     session.timeout.ms = 10000
>> >>>>     ssl.cipher.suites = null
>> >>>>     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> >>>>     ssl.endpoint.identification.algorithm = null
>> >>>>     ssl.key.password = null
>> >>>>     ssl.keymanager.algorithm = SunX509
>> >>>>     ssl.keystore.location = null
>> >>>>     ssl.keystore.password = null
>> >>>>     ssl.keystore.type = JKS
>> >>>>     ssl.protocol = TLS
>> >>>>     ssl.provider = null
>> >>>>     ssl.secure.random.implementation = null
>> >>>>     ssl.trustmanager.algorithm = PKIX
>> >>>>     ssl.truststore.location = null
>> >>>>     ssl.truststore.password = null
>> >>>>     ssl.truststore.type = JKS
>> >>>>     value.deserializer = class my.dev.MessageUpdateCodec
>> >>>>
>> >>>>
>> >>>> DEBUG | 21:59:30 | consumer.KafkaConsumer (KafkaConsumer.java:1617) -
>> >>>> The Kafka consumer has closed. and the whole process repeats.
>> >>>>
>> >>>>
>> >>>>
>> >>>> Below you can find my startup code for kafkastreams and the
>> parameters
>> >>>> which I have configured for starting the kafkastreams application .
>> >>>>
>> >>>>         private static Properties settings = new Properties();
>> >>>>         settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> >>>> "mykafkastreamsapplication");
>> >>>>         settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"lates
>> t");
>> >>>>         settings.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"10
>> >>>> 000");
>> >>>>         settings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"3000
>> 0");
>> >>>>         settings.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,Inte
>> >>>> ger.MAX_VALUE);
>> >>>>         settings.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
>> "10000");
>> >>>>         settings.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,"
>> >>>> 60000");
>> >>>>
>> >>>>         KStreamBuilder builder = new KStreamBuilder();
>> >>>>         KafkaStreams streams = new KafkaStreams(builder, settings);
>> >>>>         builder.addSource(.....
>> >>>>          .addProcessor  .............
>> >>>>          .addProcessor  ........
>> >>>>
>> >>>>          .addStateStore(...............
>> ....).persistent().build(),"my
>> >>>> processor")
>> >>>>          .addSink ..............
>> >>>>          . addSink ..............
>> >>>>           streams.start();
>> >>>>
>> >>>> and I am using a Simple  processor to process my logic ..
>> >>>>
>> >>>> public class InfoProcessor extends AbstractProcessor<Key, Update> {
>> >>>> private static Logger logger = Logger.getLogger(InfoProcessor
>> .class);
>> >>>> private ProcessorContext context;
>> >>>> private KeyValueStore<Key, Info> infoStore;
>> >>>>
>> >>>> @Override
>> >>>> @SuppressWarnings("unchecked")
>> >>>> public void init(ProcessorContext context) {
>> >>>>     this.context = context;
>> >>>>     this.context.schedule(Constants.BATCH_DURATION_SECONDS * 1000);
>> >>>>     infoStore = (KeyValueStore<Key, Info>)
>> >>>> context.getStateStore("InfoStore");
>> >>>> }
>> >>>>
>> >>>> @Override
>> >>>> public void process(Key key, Update update) {
>> >>>>     try {
>> >>>>         if (key != null && update != null) {
>> >>>>             Info info = infoStore.get(key);
>> >>>>             // merge logic
>> >>>>             infoStore.put(key, info);
>> >>>>         }
>> >>>>
>> >>>>     } catch (Exception e) {
>> >>>>         logger.error(e.getMessage(), e);
>> >>>>     } finally {
>> >>>>     }
>> >>>>     context.commit();
>> >>>> }
>> >>>>
>> >>>> @Override
>> >>>> public void punctuate(long timestamp) {
>> >>>>     try {
>> >>>>         KeyValueIterator<Key, Info> iter = this.infoStore.all();
>> >>>>         while (iter.hasNext()) {
>> >>>>             // processing logic
>> >>>>
>> >>>>         }
>> >>>>         iter.close();
>> >>>>         context.commit();
>> >>>>     } catch (Exception e) {
>> >>>>         logger.error(e.getMessage(), e);
>> >>>>     }
>> >>>> }
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>
>

Reply via email to