Is there anything abnormal in network performance for your Kafka brokers
ec2 instances and Kafka stream processor ec2 instances? Is the network link
getting saturated? If so you might want to upgrade Kafka brokers to an
instance type with more network bandwidth.

I would also go check the memory utilization and see if some oom killer or
something similar is playing truant.

I assume you have already validated with max poll records set to 500.

Regards
Sab

On 15 Sep 2017 11:49 am, "dev loper" <spark...@gmail.com> wrote:

> Dear Kafka Users,
>
> I am fairly new to Kafka Streams . I have deployed two instances of Kafka
> 0.11 brokers on AWS M3.Xlarge insatnces. I have created a topic with 36
> partitions .and speperate application writes to this topic and it produces
> records at the rate of 10000 messages per second. I have threes instances
> of AWS  M4.xlarge instance  where my Kafka streams application is running
> which consumes these messages produced by the other application. The
> application  starts up fine working fine and its processing messages on the
> first instance,  but when I start the same application on other instances
> it is not starting even though the process is alive it is not processing
> messages.Also I could see the other instances takes a long time to start .
>
> Apart from first instance,  other instances I could see the consumer
> getting added and removed repeatedly and I couldn't see any message
> processing at all . I have attached the detailed logs where this behavior
> is observed.
>
> Consumer is getting started with below log in these instances and getting
> stopped with below log (* detailed logs attached *)
>
> INFO  | 21:59:30 | consumer.ConsumerConfig (AbstractConfig.java:223) -
> ConsumerConfig values:
>     auto.commit.interval.ms = 5000
>     auto.offset.reset = latest
>     bootstrap.servers = [l-mykafkainstancekafka5101:9092,
> l-mykafkainstancekafka5102:9092]
>     check.crcs = true
>     client.id =
>     connections.max.idle.ms = 540000
>     enable.auto.commit = false
>     exclude.internal.topics = true
>     fetch.max.bytes = 52428800
>     fetch.max.wait.ms = 500
>     fetch.min.bytes = 1
>     group.id = myKafka-kafkareplica101Sept08
>     heartbeat.interval.ms = 3000
>     interceptor.classes = null
>     internal.leave.group.on.close = true
>     isolation.level = read_uncommitted
>     key.deserializer = class mx.july.jmx.proximity.kafka.KafkaKryoCodec
>     max.partition.fetch.bytes = 1048576
>     max.poll.interval.ms = 300000
>     max.poll.records = 500
>     metadata.max.age.ms = 300000
>     metric.reporters = []
>     metrics.num.samples = 2
>     metrics.recording.level = INFO
>     metrics.sample.window.ms = 30000
>     partition.assignment.strategy = [class org.apache.kafka.clients.
> consumer.RangeAssignor]
>     receive.buffer.bytes = 65536
>     reconnect.backoff.max.ms = 1000
>     reconnect.backoff.ms = 50
>     request.timeout.ms = 305000
>     retry.backoff.ms = 100
>     sasl.jaas.config = null
>     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>     sasl.kerberos.min.time.before.relogin = 60000
>     sasl.kerberos.service.name = null
>     sasl.kerberos.ticket.renew.jitter = 0.05
>     sasl.kerberos.ticket.renew.window.factor = 0.8
>     sasl.mechanism = GSSAPI
>     security.protocol = PLAINTEXT
>     send.buffer.bytes = 131072
>     session.timeout.ms = 10000
>     ssl.cipher.suites = null
>     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>     ssl.endpoint.identification.algorithm = null
>     ssl.key.password = null
>     ssl.keymanager.algorithm = SunX509
>     ssl.keystore.location = null
>     ssl.keystore.password = null
>     ssl.keystore.type = JKS
>     ssl.protocol = TLS
>     ssl.provider = null
>     ssl.secure.random.implementation = null
>     ssl.trustmanager.algorithm = PKIX
>     ssl.truststore.location = null
>     ssl.truststore.password = null
>     ssl.truststore.type = JKS
>     value.deserializer = class my.dev.MessageUpdateCodec
>
>
> DEBUG | 21:59:30 | consumer.KafkaConsumer (KafkaConsumer.java:1617) - The
> Kafka consumer has closed. and the whole process repeats.
>
>
>
> Below you can find my startup code for kafkastreams and the parameters
> which I have configured for starting the kafkastreams application .
>
>         private static Properties settings = new Properties();
>         settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "mykafkastreamsapplication");
>         settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
>         settings.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"10000");
>         settings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
>         settings.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,Inte
> ger.MAX_VALUE);
>         settings.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10000");
>         settings.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,"
> 60000");
>
>         KStreamBuilder builder = new KStreamBuilder();
>         KafkaStreams streams = new KafkaStreams(builder, settings);
>         builder.addSource(.....
>          .addProcessor  .............
>          .addProcessor  ........
>
>          .addStateStore(...................).persistent().build(),"
> myprocessor")
>          .addSink ..............
>          . addSink ..............
>           streams.start();
>
> and I am using a Simple  processor to process my logic ..
>
> public class InfoProcessor extends AbstractProcessor<Key, Update> {
> private static Logger logger = Logger.getLogger(InfoProcessor.class);
> private ProcessorContext context;
> private KeyValueStore<Key, Info> infoStore;
>
> @Override
> @SuppressWarnings("unchecked")
> public void init(ProcessorContext context) {
>     this.context = context;
>     this.context.schedule(Constants.BATCH_DURATION_SECONDS * 1000);
>     infoStore = (KeyValueStore<Key, Info>) context.getStateStore("
> InfoStore");
> }
>
> @Override
> public void process(Key key, Update update) {
>     try {
>         if (key != null && update != null) {
>             Info info = infoStore.get(key);
>             // merge logic
>             infoStore.put(key, info);
>         }
>
>     } catch (Exception e) {
>         logger.error(e.getMessage(), e);
>     } finally {
>     }
>     context.commit();
> }
>
> @Override
> public void punctuate(long timestamp) {
>     try {
>         KeyValueIterator<Key, Info> iter = this.infoStore.all();
>         while (iter.hasNext()) {
>             // processing logic
>
>         }
>         iter.close();
>         context.commit();
>     } catch (Exception e) {
>         logger.error(e.getMessage(), e);
>     }
> }
>
>
>
>
>

Reply via email to