Hi Guozhang, Thanks for looking into this. Below are the stream config values.
INFO 2018-02-02 08:33:25.708 [main] org.apache.kafka.streams.StreamsConfig - StreamsConfig values: application.id = cv-v1 application.server = bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092] buffered.records.per.partition = 1000 cache.max.bytes.buffering = 104857600 client.id = I2 commit.interval.ms = 30000 connections.max.idle.ms = 540000 default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde key.serde = null metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = DEBUG metrics.sample.window.ms = 30000 num.standby.replicas = 1 num.stream.threads = 1 partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper poll.ms = 100 processing.guarantee = at_least_once receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 replication.factor = 2 request.timeout.ms = 40000 retry.backoff.ms = 100 rocksdb.config.setter = null security.protocol = PLAINTEXT send.buffer.bytes = 131072 state.cleanup.delay.ms = 600000 state.dir = /mnt/store/kafka-streams timestamp.extractor = null value.serde = null windowstore.changelog.additional.retention.ms = 86400000 zookeeper.connect = INFO 2018-02-02 08:33:25.870 [main] org.apache.kafka.streams.StreamsConfig - StreamsConfig values: application.id = pe-v1 application.server = bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092] buffered.records.per.partition = 1000 cache.max.bytes.buffering = 2147483648 client.id = I2 commit.interval.ms = 30000 connections.max.idle.ms = 540000 default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde key.serde = null metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = DEBUG metrics.sample.window.ms = 30000 num.standby.replicas = 1 num.stream.threads = 3 partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper poll.ms = 100 processing.guarantee = at_least_once receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 replication.factor = 2 request.timeout.ms = 40000 retry.backoff.ms = 100 rocksdb.config.setter = null security.protocol = PLAINTEXT send.buffer.bytes = 131072 state.cleanup.delay.ms = 600000 state.dir = /mnt/store/kafka-streams timestamp.extractor = null value.serde = null windowstore.changelog.additional.retention.ms = 86400000 zookeeper.connect = Please note there are 2 streams application running. Apart from the Broker connectivity issue, I see the below logs as well. What does that indicate and does it have any impact on the processing? DEBUG 2018-02-05 11:06:07.369 [I2-StreamThread-5] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [I2-StreamThread-5] processing latency 6694 < commit time 30000 for 10000 records. Adjusting up recordsProcessedBeforeCommit=44816 DEBUG 2018-02-05 11:06:07.518 [I2-StreamThread-3] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [I2-StreamThread-3] processing latency 3684 < commit time 30000 for 10000 records. Adjusting up recordsProcessedBeforeCommit=81433 DEBUG 2018-02-05 11:06:07.562 [I2-StreamThread-4] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [I2-StreamThread-4] processing latency 3620 < commit time 30000 for 10000 records. Adjusting up recordsProcessedBeforeCommit=82872 DEBUG 2018-02-05 11:06:09.838 [I2-StreamThread-7] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [I2-StreamThread-7] processing latency 3843 < commit time 30000 for 10000 records. Adjusting up recordsProcessedBeforeCommit=78064 DEBUG 2018-02-05 11:06:10.073 [I2-StreamThread-3] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [I2-StreamThread-3] processing latency 2552 < commit time 30000 for 7193 records. Adjusting up recordsProcessedBeforeCommit=84557 DEBUG 2018-02-05 11:06:10.209 [I2-StreamThread-4] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [I2-StreamThread-4] processing latency 2645 < commit time 30000 for 7161 records. Adjusting up recordsProcessedBeforeCommit=81221 DEBUG 2018-02-05 11:06:11.940 [I2-StreamThread-5] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [I2-StreamThread-5] processing latency 4568 < commit time 30000 for 7583 records. Adjusting up recordsProcessedBeforeCommit=49800 DEBUG 2018-02-05 11:06:12.538 [I2-StreamThread-7] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [I2-StreamThread-7] processing latency 2698 < commit time 30000 for 6880 records. Adjusting up recordsProcessedBeforeCommit=76501 Thanks, Tony On Tue, Feb 6, 2018 at 3:21 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Tony, > > > Could you share your Streams config values so that people can help further > investigating your issue? > > > Guozhang > > > On Mon, Feb 5, 2018 at 12:00 AM, Tony John <tonyjohnant...@gmail.com> > wrote: > > > Hi All, > > > > I have been running a streams application for sometime. The application > > runs fine for sometime but after a day or two I see the below log getting > > printed continuously on to the console. > > > > WARN 2018-02-05 02:50:04.060 [kafka-producer-network-thread | > producer-1] > > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not > > be > > established. Broker may not be available. > > > > WARN 2018-02-05 02:50:04.160 [kafka-producer-network-thread | > producer-1] > > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not > > be > > established. Broker may not be available. > > > > WARN 2018-02-05 02:50:04.261 [kafka-producer-network-thread | > producer-1] > > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not > > be > > established. Broker may not be available. > > > > WARN 2018-02-05 02:50:04.311 [kafka-producer-network-thread | > producer-1] > > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not > > be > > established. Broker may not be available. > > > > WARN 2018-02-05 02:50:04.361 [kafka-producer-network-thread | > producer-1] > > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not > > be > > established. Broker may not be available. > > > > WARN 2018-02-05 02:50:04.411 [kafka-producer-network-thread | > producer-1] > > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not > > be > > established. Broker may not be available. > > > > At this time, though the application is able to process the messages, I > > could also see lag building up in the consumers and the processing time > for > > a batch has increased 15 folds. > > > > I am using a single zoo-keeper instance with 2 brokers and 4 application > > instances. I checked the broker and zoo-keeper status, they are all > running > > fine as I could see. I have also verified the connectivity between the > > application and broker instances using telnet and it seems intact. The > > kafka broker and streams/client versions are 0.11.0.2. Results of broker > > status results from zoo-keeper below > > > > > > [root@app100 kafka]# echo dump | nc localhost 2181 > > > > SessionTracker dump: > > > > Session Sets (3): > > > > 0 expire at Mon Feb 05 06:16:39 UTC 2018: > > > > 1 expire at Mon Feb 05 06:16:42 UTC 2018: > > > > 0x161562860970001 > > > > 1 expire at Mon Feb 05 06:16:45 UTC 2018: > > > > 0x161562860970000 > > > > ephemeral nodes dump: > > > > Sessions with Ephemerals (2): > > > > 0x161562860970000: > > > > /brokers/ids/0 > > > > /controller > > > > 0x161562860970001: > > > > /brokers/ids/1 > > > > [root@app100 kafka]# ./kafka_2.11-0.11.0.2/bin/zookeeper-shell.sh > > localhost:2181 <<< "get /brokers/ids/0" > > > > Connecting to localhost:2181 > > > > Welcome to ZooKeeper! > > > > JLine support is disabled > > > > > > WATCHER:: > > > > > > WatchedEvent state:SyncConnected type:None path:null > > > > {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"} > > ,"endpoints":["PLAINTEXT://172.31.10.35:9092"],"jmx_port": > > 55555,"host":"172.31.10.35","timestamp":"1517569007467"," > > port":9092,"version":4} > > > > cZxid = 0x1c > > > > ctime = Fri Feb 02 10:56:47 UTC 2018 > > > > mZxid = 0x1c > > > > mtime = Fri Feb 02 10:56:47 UTC 2018 > > > > pZxid = 0x1c > > > > cversion = 0 > > > > dataVersion = 0 > > > > aclVersion = 0 > > > > ephemeralOwner = 0x161562860970000 > > > > dataLength = 197 > > > > numChildren = 0 > > > > [root@app100 kafka]# ./kafka_2.11-0.11.0.2/bin/zookeeper-shell.sh > > localhost:2181 <<< "get /brokers/ids/1" > > > > Connecting to localhost:2181 > > > > Welcome to ZooKeeper! > > > > JLine support is disabled > > > > > > WATCHER:: > > > > > > WatchedEvent state:SyncConnected type:None path:null > > > > {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"} > > ,"endpoints":["PLAINTEXT://172.31.14.8:9092"],"jmx_port": > > 55555,"host":"172.31.14.8","timestamp":"1517569016562"," > > port":9092,"version":4} > > > > cZxid = 0x21 > > > > ctime = Fri Feb 02 10:56:56 UTC 2018 > > > > mZxid = 0x21 > > > > mtime = Fri Feb 02 10:56:56 UTC 2018 > > > > pZxid = 0x21 > > > > cversion = 0 > > > > dataVersion = 0 > > > > aclVersion = 0 > > > > ephemeralOwner = 0x161562860970001 > > > > dataLength = 195 > > > > numChildren = 0 > > > > Could you please throw some light on this as to what could be going wrong > > here? > > > > Thanks, > > Tony > > > > > > -- > -- Guozhang >