Hi Guozhang,

Thanks for looking into this. Below are the stream config values.

INFO  2018-02-02 08:33:25.708 [main] org.apache.kafka.streams.StreamsConfig
- StreamsConfig values:
application.id = cv-v1
application.server =
bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 104857600
client.id = I2
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.key.serde = class
org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class
org.apache.kafka.common.serialization.Serdes$ByteArraySerde
key.serde = null
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = DEBUG
metrics.sample.window.ms = 30000
num.standby.replicas = 1
num.stream.threads = 1
partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 2
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /mnt/store/kafka-streams
timestamp.extractor = null
value.serde = null
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect =
INFO  2018-02-02 08:33:25.870 [main] org.apache.kafka.streams.StreamsConfig
- StreamsConfig values:
application.id = pe-v1
application.server =
bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 2147483648
client.id = I2
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.key.serde = class
org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class
org.apache.kafka.common.serialization.Serdes$ByteArraySerde
key.serde = null
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = DEBUG
metrics.sample.window.ms = 30000
num.standby.replicas = 1
num.stream.threads = 3
partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 2
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /mnt/store/kafka-streams
timestamp.extractor = null
value.serde = null
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect =

Please note there are 2 streams application running.

Apart from the Broker connectivity issue, I see the below logs as well.
What does that indicate and does it have any impact on the processing?

DEBUG 2018-02-05 11:06:07.369 [I2-StreamThread-5]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[I2-StreamThread-5] processing latency 6694 < commit time 30000 for 10000
records. Adjusting up recordsProcessedBeforeCommit=44816
DEBUG 2018-02-05 11:06:07.518 [I2-StreamThread-3]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[I2-StreamThread-3] processing latency 3684 < commit time 30000 for 10000
records. Adjusting up recordsProcessedBeforeCommit=81433
DEBUG 2018-02-05 11:06:07.562 [I2-StreamThread-4]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[I2-StreamThread-4] processing latency 3620 < commit time 30000 for 10000
records. Adjusting up recordsProcessedBeforeCommit=82872
DEBUG 2018-02-05 11:06:09.838 [I2-StreamThread-7]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[I2-StreamThread-7] processing latency 3843 < commit time 30000 for 10000
records. Adjusting up recordsProcessedBeforeCommit=78064
DEBUG 2018-02-05 11:06:10.073 [I2-StreamThread-3]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[I2-StreamThread-3] processing latency 2552 < commit time 30000 for 7193
records. Adjusting up recordsProcessedBeforeCommit=84557
DEBUG 2018-02-05 11:06:10.209 [I2-StreamThread-4]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[I2-StreamThread-4] processing latency 2645 < commit time 30000 for 7161
records. Adjusting up recordsProcessedBeforeCommit=81221
DEBUG 2018-02-05 11:06:11.940 [I2-StreamThread-5]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[I2-StreamThread-5] processing latency 4568 < commit time 30000 for 7583
records. Adjusting up recordsProcessedBeforeCommit=49800
DEBUG 2018-02-05 11:06:12.538 [I2-StreamThread-7]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[I2-StreamThread-7] processing latency 2698 < commit time 30000 for 6880
records. Adjusting up recordsProcessedBeforeCommit=76501


Thanks,
Tony

On Tue, Feb 6, 2018 at 3:21 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Tony,
>
>
> Could you share your Streams config values so that people can help further
> investigating your issue?
>
>
> Guozhang
>
>
> On Mon, Feb 5, 2018 at 12:00 AM, Tony John <tonyjohnant...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > I have been running a streams application for sometime. The application
> > runs fine for sometime but after a day or two I see the below log getting
> > printed continuously on to the console.
> >
> > WARN  2018-02-05 02:50:04.060 [kafka-producer-network-thread |
> producer-1]
> > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not
> > be
> > established. Broker may not be available.
> >
> > WARN  2018-02-05 02:50:04.160 [kafka-producer-network-thread |
> producer-1]
> > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not
> > be
> > established. Broker may not be available.
> >
> > WARN  2018-02-05 02:50:04.261 [kafka-producer-network-thread |
> producer-1]
> > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not
> > be
> > established. Broker may not be available.
> >
> > WARN  2018-02-05 02:50:04.311 [kafka-producer-network-thread |
> producer-1]
> > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not
> > be
> > established. Broker may not be available.
> >
> > WARN  2018-02-05 02:50:04.361 [kafka-producer-network-thread |
> producer-1]
> > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not
> > be
> > established. Broker may not be available.
> >
> > WARN  2018-02-05 02:50:04.411 [kafka-producer-network-thread |
> producer-1]
> > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not
> > be
> > established. Broker may not be available.
> >
> > At this time, though the application is able to process the messages, I
> > could also see lag building up in the consumers and the processing time
> for
> > a batch has increased 15 folds.
> >
> > I am using a single zoo-keeper instance with 2 brokers and 4 application
> > instances. I checked the broker and zoo-keeper status, they are all
> running
> > fine as I could see. I have also verified the connectivity between the
> > application and broker instances using telnet and it seems intact. The
> > kafka broker and streams/client versions are 0.11.0.2. Results of broker
> > status results from zoo-keeper below
> >
> >
> > [root@app100 kafka]# echo dump | nc localhost 2181
> >
> > SessionTracker dump:
> >
> > Session Sets (3):
> >
> > 0 expire at Mon Feb 05 06:16:39 UTC 2018:
> >
> > 1 expire at Mon Feb 05 06:16:42 UTC 2018:
> >
> > 0x161562860970001
> >
> > 1 expire at Mon Feb 05 06:16:45 UTC 2018:
> >
> > 0x161562860970000
> >
> > ephemeral nodes dump:
> >
> > Sessions with Ephemerals (2):
> >
> > 0x161562860970000:
> >
> > /brokers/ids/0
> >
> > /controller
> >
> > 0x161562860970001:
> >
> > /brokers/ids/1
> >
> > [root@app100 kafka]# ./kafka_2.11-0.11.0.2/bin/zookeeper-shell.sh
> > localhost:2181 <<< "get /brokers/ids/0"
> >
> > Connecting to localhost:2181
> >
> > Welcome to ZooKeeper!
> >
> > JLine support is disabled
> >
> >
> > WATCHER::
> >
> >
> > WatchedEvent state:SyncConnected type:None path:null
> >
> > {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"}
> > ,"endpoints":["PLAINTEXT://172.31.10.35:9092"],"jmx_port":
> > 55555,"host":"172.31.10.35","timestamp":"1517569007467","
> > port":9092,"version":4}
> >
> > cZxid = 0x1c
> >
> > ctime = Fri Feb 02 10:56:47 UTC 2018
> >
> > mZxid = 0x1c
> >
> > mtime = Fri Feb 02 10:56:47 UTC 2018
> >
> > pZxid = 0x1c
> >
> > cversion = 0
> >
> > dataVersion = 0
> >
> > aclVersion = 0
> >
> > ephemeralOwner = 0x161562860970000
> >
> > dataLength = 197
> >
> > numChildren = 0
> >
> > [root@app100 kafka]# ./kafka_2.11-0.11.0.2/bin/zookeeper-shell.sh
> > localhost:2181 <<< "get /brokers/ids/1"
> >
> > Connecting to localhost:2181
> >
> > Welcome to ZooKeeper!
> >
> > JLine support is disabled
> >
> >
> > WATCHER::
> >
> >
> > WatchedEvent state:SyncConnected type:None path:null
> >
> > {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"}
> > ,"endpoints":["PLAINTEXT://172.31.14.8:9092"],"jmx_port":
> > 55555,"host":"172.31.14.8","timestamp":"1517569016562","
> > port":9092,"version":4}
> >
> > cZxid = 0x21
> >
> > ctime = Fri Feb 02 10:56:56 UTC 2018
> >
> > mZxid = 0x21
> >
> > mtime = Fri Feb 02 10:56:56 UTC 2018
> >
> > pZxid = 0x21
> >
> > cversion = 0
> >
> > dataVersion = 0
> >
> > aclVersion = 0
> >
> > ephemeralOwner = 0x161562860970001
> >
> > dataLength = 195
> >
> > numChildren = 0
> >
> > Could you please throw some light on this as to what could be going wrong
> > here?
> >
> > Thanks,
> > Tony
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to