Hello Folks,
I am using Highlevel consumer, and it seems to drop connections
intermittently:

2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
Received -1 when reading from channel, socket has likely been closed.
2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
[ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20220;
ClientId:
campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
[test_topic,18] -> PartitionFetchInfo(1681313989,4194304),[test_topic,21]
-> PartitionFetchInfo(141266339,4194304)
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect(Native Method)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
        at kafka.network.BlockingChannel.connect(Unknown Source)
        at kafka.consumer.SimpleConsumer.connect(Unknown Source)
        at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
Source)
        at kafka.metrics.KafkaTimer.time(Unknown Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
Source)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
Source)
        at kafka.metrics.KafkaTimer.time(Unknown Source)
        at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
        at kafka.server.AbstractFetcherThread.processFetchRequest(Unknown
Source)
        at kafka.server.AbstractFetcherThread.doWork(Unknown Source)
        at kafka.utils.ShutdownableThread.run(Unknown Source)
2014-11-01 13:34:40 VerifiableProperties [INFO] Verifying properties

or sometimes:
2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
null
2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
[ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20222;
ClientId:
campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
[test_topic,18] -> PartitionFetchInfo(1681313989,4194304)
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect(Native Method)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
        at kafka.network.BlockingChannel.connect(Unknown Source)
        at kafka.consumer.SimpleConsumer.connect(Unknown Source)
        at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
Source)
        at kafka.metrics.KafkaTimer.time(Unknown Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
Source)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
Source)
        at kafka.metrics.KafkaTimer.time(Unknown Source)
        at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
        at kafka.server.AbstractFetcherThread.processFetchRequest(Unknown
Source)
        at kafka.server.AbstractFetcherThread.doWork(Unknown Source)
        at kafka.utils.ShutdownableThread.run(Unknown Source)

The config I am using is:
kafka.config.fetch.message.max.bytes4194304kafka.config.group.idmygroupid
kafka.config.rebalance.backoff.ms6000kafka.config.rebalance.max.retries6
kafka.config.zookeeper.connectbrokerlist
kafka.config.zookeeper.session.timeout.ms60000
There should not be any network connectivity issue as all the zookeepers,
brokers,consumers are in the same cluster.

What would be the cause for the connection reset error? Is it because the
zookeeper cannot talk to the broker to get the partitionInfo?
Thanks,
Chen

Reply via email to