Java threadpool is notorious for eating exceptions. Could you add try/catch
in ConsumerTest while iterating through the messages?

Thanks,

Jun


On Mon, Mar 24, 2014 at 12:44 PM, Gufran Pathan
<gufran.pat...@mu-sigma.com>wrote:

>  Hi,
>
>
>
> I'm facing an issue exactly similar to the one issued by someone else a
> few days ago (see below for the previous thread transcript).
>
>
>
> I'm using a High Level Consumer java program to consume messages. The
> consumer ends after 10 seconds (exactly the same time as faced by the other
> user). I've tried increasing the "zookeeper.session.timeout.ms" to
> "40000" and the  "zookeeper.sync.time.ms" to "20000" but still no
> difference. A fellow user suggested that the issue is due to GC and it
> should be tuned. Any other thoughts?
>
>
>
> Attaching the logs and the Consumer code herewith.
>
>
>
> Here's the *log info I get from the zookeeper side*:
>
>
>
> *[2014-03-24 23:45:58,371] INFO Accepted socket connection from
> /172.25.2.122:55327 <http://172.25.2.122:55327>
> (org.apache.zookeeper.server.NIOServerCnxn)*
>
> *[2014-03-24 23:45:58,375] INFO Client attempting to establish new session
> at /172.25.2.122:55327 <http://172.25.2.122:55327>
> (org.apache.zookeeper.server.NIOServerCnxn)*
>
> *[2014-03-24 23:45:58,379] INFO Established session 0x144f5342bc60007 with
> negotiated timeout 40000 for client /172.25.2.122:55327
> <http://172.25.2.122:55327> (org.apache.zookeeper.server.NIOServerCnxn)*
>
> *[2014-03-24 23:46:09,129] INFO Processed session termination for
> sessionid: 0x144f5342bc60007
> (org.apache.zookeeper.server.PrepRequestProcessor)*
>
> *[2014-03-24 23:46:09,138] WARN EndOfStreamException: Unable to read
> additional data from client sessionid 0x144f5342bc60007, likely client has
> closed socket (org.apache.zookeeper.server.NIOServerCnxn)*
>
> *[2014-03-24 23:46:09,139] INFO Closed socket connection for client
> /172.25.2.122:55327 <http://172.25.2.122:55327> which had sessionid
> 0x144f5342bc60007 (org.apache.zookeeper.server.NIOServerCnxn)*
>
>
>
> *The Consumer logs*:
>
>
>
> *INFO  2014-03-24 23:45:58,413 [main] kafka.utils.VerifiableProperties  -
> Verifying properties*
>
> *INFO  2014-03-24 23:45:58,435 [main] kafka.utils.VerifiableProperties  -
> Property auto.commit.interval.ms <http://auto.commit.interval.ms> is
> overridden to 1000*
>
> *INFO  2014-03-24 23:45:58,435 [main] kafka.utils.VerifiableProperties  -
> Property group.id <http://group.id> is overridden to group1*
>
> *INFO  2014-03-24 23:45:58,435 [main] kafka.utils.VerifiableProperties  -
> Property zookeeper.connect is overridden to 172.25.1.94:2181
> <http://172.25.1.94:2181>*
>
> *INFO  2014-03-24 23:45:58,436 [main] kafka.utils.VerifiableProperties  -
> Property zookeeper.session.timeout.ms <http://zookeeper.session.timeout.ms>
> is overridden to 40000*
>
> *INFO  2014-03-24 23:45:58,436 [main] kafka.utils.VerifiableProperties  -
> Property zookeeper.sync.time.ms <http://zookeeper.sync.time.ms> is
> overridden to 20000*
>
> *INFO  2014-03-24 23:45:58,485 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Connecting to zookeeper instance
> at 172.25.1.94:2181 <http://172.25.1.94:2181>*
>
> *INFO  2014-03-24 23:45:58,493 [ZkClient-EventThread-9-172.25.1.94:2181]
> org.I0Itec.zkclient.ZkEventThread  - Starting ZkClient event thread.*
>
> *INFO  2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:zookeeper.version=3.3.1-942149, built on 05/07/2010
> 17:14 GMT*
>
> *INFO  2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:host.name <http://host.name>=LAPSZ0914.mu-sigma.local*
>
> *INFO  2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:java.version=1.7.0_51*
>
> *INFO  2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:java.vendor=Oracle Corporation*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:java.home=C:\Program Files\Java\jre7*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client
> environment:java.class.path=C:\Users\*****\NewWorkSpace\KafkaNew\target\classes;D:\Modules\sl4j\slf4j-1.7.6\slf4j-log4j12-1.7.6.jar;C:\Users\*****\.m2\repository\org\apache\kafka\kafka_2.9.1\0.8.0-beta1\kafka_2.9.1-0.8.0-beta1.jar;C:\Users\*****\.m2\repository\org\scala-lang\scala-library\2.9.1\scala-library-2.9.1.jar;C:\Users\*****\.m2\repository\org\scala-lang\scala-compiler\2.9.1\scala-compiler-2.9.1.jar;C:\Users\*****\.m2\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;C:\Users\*****\.m2\repository\com\yammer\metrics\metrics-annotation\2.2.0\metrics-annotation-2.2.0.jar;C:\Users\*****\.m2\repository\com\101tec\zkclient\0.3\zkclient-0.3.jar;C:\Users\*****\.m2\repository\org\apache\zookeeper\zookeeper\3.3.1\zookeeper-3.3.1.jar;C:\Users\*****\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;C:\Users\*****\.m2\repository\log4j\log4j\1.2.14\log4j-1.2.14.jar;C:\Users\*****\.m2\repository\net\sf\jopt-simple\jopt-simple\3.2\jopt-simple-3.2.jar;C:\Users\*****\.m2\repository\junit\junit\3.8.1\junit-3.8.1.jar;C:\Users\*****\.m2\repository\org\slf4j\slf4j-api\1.7.6\slf4j-api-1.7.6.jar*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:java.library.path=C:\Program
> Files\Java\jre7\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:/Program
> Files/Java/jre7/bin/client;C:/Program Files/Java/jre7/bin;C:/Program
> Files/Java/jre7/lib/i386;C:\Python27\;C:\Python27\Scripts;C:\Program
> Files\Common Files\Microsoft Shared\Microsoft Online Services;C:\Program
> Files\RSA SecurID Token Common;C:\Program Files\Intel\iCLS
> Client\;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program
> Files\Intel\Intel(R) Management Engine Components\DAL;C:\Program
> Files\Intel\Intel(R) Management Engine Components\IPT;C:\Program
> Files\Intel\OpenCL SDK\2.0\bin\x86;c:\Program Files\Microsoft SQL
> Server\100\Tools\Binn\VSShell\Common7\IDE\;c:\Program Files\Microsoft SQL
> Server\100\Tools\Binn\;c:\Program Files\Microsoft SQL
> Server\100\DTS\Binn\;C:\Program Files\SAS\SharedFiles\Formats;C:\Program
> Files\Java\jdk1.7.0_51\bin;C:\Program
> Files\apache-maven-3.0.5\bin;C:\Program Files\Git\cmd;
> D:\Modules\Storm\apache-storm-0.9.1-incubating\bin;C:\Program
> Files\GnuWin32\bin;C:\Program
> Files\sbt\\bin;C:\Users\*****\AppData\Roaming\Python\Scripts;C:\Program
> Files\Apache Software Foundation\apache-maven-3.1.1\bin;D:\Installation
> Deck\Software-2\Eclipse\eclipse-jee-juno-win32\eclipse;;.*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:java.io.tmpdir=C:\Users\GUFRAN~1.PAT\AppData\Local\Temp\*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:java.compiler=<NA>*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:os.name <http://os.name>=Windows 7*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:os.arch=x86*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:os.version=6.1*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:user.name <http://user.name>=******
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:user.home=C:\Users\******
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:user.dir=C:\Users\*****\NewWorkSpace\KafkaNew*
>
> *INFO  2014-03-24 23:45:58,501 [main] org.apache.zookeeper.ZooKeeper  -
> Initiating client connection, connectString=172.25.1.94:2181
> <http://172.25.1.94:2181> sessionTimeout=40000
> watcher=org.I0Itec.zkclient.ZkClient@1d08c1b*
>
> *INFO  2014-03-24 23:45:58,520 [main-SendThread()]
> org.apache.zookeeper.ClientCnxn  - Opening socket connection to server
> /172.25.1.94:2181 <http://172.25.1.94:2181>*
>
> *INFO  2014-03-24 23:45:58,526 [main-SendThread(vm.centos.com:2181
> <http://vm.centos.com:2181>)] org.apache.zookeeper.ClientCnxn  - Socket
> connection established to vm.centos.com/172.25.1.94:2181
> <http://vm.centos.com/172.25.1.94:2181>, initiating session*
>
> *INFO  2014-03-24 23:45:58,538 [main-SendThread(vm.centos.com:2181
> <http://vm.centos.com:2181>)] org.apache.zookeeper.ClientCnxn  - Session
> establishment complete on server vm.centos.com/172.25.1.94:2181
> <http://vm.centos.com/172.25.1.94:2181>, sessionid = 0x144f5342bc60007,
> negotiated timeout = 40000*
>
> *INFO  2014-03-24 23:45:58,540 [main-EventThread]
> org.I0Itec.zkclient.ZkClient  - zookeeper state changed (SyncConnected)*
>
> *INFO  2014-03-24 23:45:58,559 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], starting auto committer every
> 1000 ms*
>
> *INFO  2014-03-24 23:45:58,656 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], begin registering consumer
> group1_LAPSZ0914-1395684958481-7fc24c0a in ZK*
>
> *INFO  2014-03-24 23:45:58,680 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], end registering consumer
> group1_LAPSZ0914-1395684958481-7fc24c0a in ZK*
>
> *INFO  2014-03-24 23:45:58,682
> [group1_LAPSZ0914-1395684958481-7fc24c0a_watcher_executor]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], starting watcher executor thread
> for consumer group1_LAPSZ0914-1395684958481-7fc24c0a*
>
> *INFO  2014-03-24 23:45:58,710 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], begin rebalancing consumer
> group1_LAPSZ0914-1395684958481-7fc24c0a try #0*
>
> *INFO  2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties  -
> Verifying properties*
>
> *INFO  2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties  -
> Property metadata.broker.list is overridden to vm:9092*
>
> *INFO  2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties  -
> Property request.timeout.ms <http://request.timeout.ms> is overridden to
> 30000*
>
> *INFO  2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties  -
> Property client.id <http://client.id> is overridden to group1*
>
> *INFO  2014-03-24 23:45:58,888 [main] kafka.client.ClientUtils$  -
> Fetching metadata from broker id:0,host:vm,port:9092 with correlation id 0
> for 1 topic(s) Set(partitioned)*
>
> *INFO  2014-03-24 23:45:58,893 [main] kafka.producer.SyncProducer  -
> Connected to vm:9092 for producing*
>
> *INFO  2014-03-24 23:45:58,914 [main] kafka.producer.SyncProducer  -
> Disconnecting from vm:9092*
>
> *INFO  2014-03-24 23:45:58,924 [main]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Stopping leader finder thread*
>
> *INFO  2014-03-24 23:45:58,925 [main]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Stopping all fetchers*
>
> *INFO  2014-03-24 23:45:58,926 [main]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] All connections stopped*
>
> *INFO  2014-03-24 23:45:58,927 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Cleared all relevant queues for
> this fetcher*
>
> *INFO  2014-03-24 23:45:58,928 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Cleared the data chunks in all
> the consumer message iterators*
>
> *INFO  2014-03-24 23:45:58,928 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Committing all offsets after
> clearing the fetcher queues*
>
> *INFO  2014-03-24 23:45:58,929 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Releasing partition ownership*
>
> *INFO  2014-03-24 23:45:58,932 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Consumer
> group1_LAPSZ0914-1395684958481-7fc24c0a rebalancing the following
> partitions: ArrayBuffer(0, 1, 2, 3, 4) for topic partitioned with
> consumers: List(group1_LAPSZ0914-1395684958481-7fc24c0a-0,
> group1_LAPSZ0914-1395684958481-7fc24c0a-1,
> group1_LAPSZ0914-1395684958481-7fc24c0a-2,
> group1_LAPSZ0914-1395684958481-7fc24c0a-3,
> group1_LAPSZ0914-1395684958481-7fc24c0a-4)*
>
> *INFO  2014-03-24 23:45:58,934 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-4 attempting to claim partition 4*
>
> *INFO  2014-03-24 23:45:58,939 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-3 attempting to claim partition 3*
>
> *INFO  2014-03-24 23:45:58,942 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-1 attempting to claim partition 1*
>
> *INFO  2014-03-24 23:45:58,949 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-2 attempting to claim partition 2*
>
> *INFO  2014-03-24 23:45:58,953 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-0 attempting to claim partition 0*
>
> *INFO  2014-03-24 23:45:58,967 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-1 successfully owned partition 1
> for topic partitioned*
>
> *INFO  2014-03-24 23:45:58,970 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-2 successfully owned partition 2
> for topic partitioned*
>
> *INFO  2014-03-24 23:45:58,973 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-0 successfully owned partition 0
> for topic partitioned*
>
> *INFO  2014-03-24 23:45:58,977 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-4 successfully owned partition 4
> for topic partitioned*
>
> *INFO  2014-03-24 23:45:58,980 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-3 successfully owned partition 3
> for topic partitioned*
>
> *INFO  2014-03-24 23:45:58,981 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Updating the cache*
>
> *INFO  2014-03-24 23:45:58,984 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Consumer
> group1_LAPSZ0914-1395684958481-7fc24c0a selected partitions :
> partitioned:0: fetched offset = 1939: consumed offset = 1939,partitioned:1:
> fetched offset = 1969: consumed offset = 1969,partitioned:2: fetched offset
> = 30917: consumed offset = 30917,partitioned:3: fetched offset = 52147:
> consumed offset = 52147,partitioned:4: fetched offset = 1876: consumed
> offset = 1876*
>
> *INFO  2014-03-24 23:45:58,988 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], end rebalancing consumer
> group1_LAPSZ0914-1395684958481-7fc24c0a try #0*
>
> *INFO  2014-03-24 23:45:58,989
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Starting *
>
> * INFO  2014-03-24 23:45:59,000
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.utils.VerifiableProperties  - Verifying properties*
>
> *INFO  2014-03-24 23:45:59,000
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.utils.VerifiableProperties  - Property metadata.broker.list is
> overridden to vm:9092*
>
> *INFO  2014-03-24 23:45:59,000
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.utils.VerifiableProperties  - Property request.timeout.ms
> <http://request.timeout.ms> is overridden to 30000*
>
> *INFO  2014-03-24 23:45:59,000
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.utils.VerifiableProperties  - Property client.id <http://client.id>
> is overridden to group1*
>
> *INFO  2014-03-24 23:45:59,001
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.client.ClientUtils$  - Fetching metadata from broker
> id:0,host:vm,port:9092 with correlation id 0 for 1 topic(s)
> Set(partitioned)*
>
> *INFO  2014-03-24 23:45:59,004
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.producer.SyncProducer  - Connected to vm:9092 for producing*
>
> *INFO  2014-03-24 23:45:59,043
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.producer.SyncProducer  - Disconnecting from vm:9092*
>
> *INFO  2014-03-24 23:45:59,056
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Adding fetcher for partition
> [partitioned,2], initOffset 30917 to broker 0 with fetcherId 0*
>
> *INFO  2014-03-24 23:45:59,056
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Adding fetcher for partition
> [partitioned,3], initOffset 52147 to broker 0 with fetcherId 0*
>
> *INFO  2014-03-24 23:45:59,056
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Adding fetcher for partition
> [partitioned,1], initOffset 1969 to broker 0 with fetcherId 0*
>
> *INFO  2014-03-24 23:45:59,056
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Adding fetcher for partition
> [partitioned,0], initOffset 1939 to broker 0 with fetcherId 0*
>
> *INFO  2014-03-24 23:45:59,056
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Adding fetcher for partition
> [partitioned,4], initOffset 1876 to broker 0 with fetcherId 0*
>
> *INFO  2014-03-24 23:45:59,058
> [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0]
> kafka.consumer.ConsumerFetcherThread  -
> [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0],
> Starting *
>
> * INFO  2014-03-24 23:46:08,991 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], ZKConsumerConnector shutting
> down*
>
> *INFO  2014-03-24 23:46:08,992 [main] kafka.utils.KafkaScheduler  -
> Forcing shutdown of Kafka scheduler*
>
> *INFO  2014-03-24 23:46:08,992 [main]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Stopping leader finder thread*
>
> *INFO  2014-03-24 23:46:08,992 [main]
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Shutting
> down*
>
> *INFO  2014-03-24 23:46:08,993
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Stopped *
>
> * INFO  2014-03-24 23:46:08,993 [main]
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Shutdown
> completed*
>
> *INFO  2014-03-24 23:46:08,993 [main]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Stopping all fetchers*
>
> *INFO  2014-03-24 23:46:08,993 [main]
> kafka.consumer.ConsumerFetcherThread  -
> [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0],
> Shutting down*
>
> *INFO  2014-03-24 23:46:09,257
> [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0]
> kafka.consumer.SimpleConsumer  - Reconnect due to socket error: *
>
> * java.nio.channels.ClosedByInterruptException*
>
> *                at
> java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown Source)*
>
> *                at sun.nio.ch.SocketChannelImpl.read(Unknown Source)*
>
> *                at
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(Unknown Source)*
>
> *                at sun.nio.ch.ChannelInputStream.read(Unknown Source)*
>
> *                at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown Source)*
>
> *                at kafka.utils.Utils$.read(Utils.scala:394)*
>
> *                at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)*
>
> *                at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)*
>
> *                at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)*
>
> *                at
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)*
>
> *                at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)*
>
> *                at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)*
>
> *                at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)*
>
> *                at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)*
>
> *                at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)*
>
> *                at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)*
>
> *                at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)*
>
> *                at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)*
>
> *                at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)*
>
> *                at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)*
>
> *                at
> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)*
>
> *                at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)*
>
> *                at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)*
>
> *                at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)*
>
> *INFO  2014-03-24 23:46:09,259
> [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0]
> kafka.consumer.ConsumerFetcherThread  -
> [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0],
> Stopped *
>
> * INFO  2014-03-24 23:46:09,259 [main]
> kafka.consumer.ConsumerFetcherThread  -
> [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0],
> Shutdown completed*
>
> *INFO  2014-03-24 23:46:09,259 [main]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] All connections stopped*
>
> *INFO  2014-03-24 23:46:09,283 [ZkClient-EventThread-9-172.25.1.94:2181]
> org.I0Itec.zkclient.ZkEventThread  - Terminate ZkClient event thread.*
>
> *INFO  2014-03-24 23:46:09,293 [main] org.apache.zookeeper.ZooKeeper  -
> Session: 0x144f5342bc60007 closed*
>
> *INFO  2014-03-24 23:46:09,294 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], ZKConsumerConnector shut down
> completed*
>
> *INFO  2014-03-24 23:46:09,694
> [group1_LAPSZ0914-1395684958481-7fc24c0a_watcher_executor]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], stopping watcher executor thread
> for consumer group1_LAPSZ0914-1395684958481-7fc24c0a*
>
>
>
> *My Consumer code*:
>
>
>
> *import kafka.consumer.ConsumerConfig;*
>
> *import kafka.consumer.KafkaStream;*
>
> *import kafka.javaapi.consumer.ConsumerConnector;*
>
>  *import java.util.HashMap;*
>
> *import java.util.List;*
>
> *import java.util.Map;*
>
> *import java.util.Properties;*
>
> *import java.util.concurrent.ExecutorService;*
>
> *import java.util.concurrent.Executors;*
>
>
>
> *public class TestConsumer {*
>
> *                private final ConsumerConnector consumer;*
>
> *                    private final String topic;*
>
> *                    private  ExecutorService executor;*
>
>
>
> *                    public TestConsumer(String a_zookeeper, String
> a_groupId, String a_topic) {*
>
> *                        consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(*
>
> *                                createConsumerConfig(a_zookeeper,
> a_groupId));*
>
> *                        this.topic = a_topic;*
>
> *                    }*
>
>
>
> *                    public void shutdown() {*
>
> *                        if (consumer != null) consumer.shutdown();*
>
> *                        if (executor != null) executor.shutdown();*
>
> *                    }*
>
>
>
> *                    public void run(int a_numThreads) {*
>
> *                        Map<String, Integer> topicCountMap = new
> HashMap<String, Integer>();*
>
> *                        topicCountMap.put(topic, new
> Integer(a_numThreads));*
>
> *                        Map<String, List<KafkaStream<byte[], byte[]>>>
> consumerMap = consumer.createMessageStreams(topicCountMap);*
>
> *                        List<KafkaStream<byte[], byte[]>> streams =
> consumerMap.get(topic);*
>
>
>
> *                        // now launch all the threads*
>
> *                        //*
>
> *                        executor =
> Executors.newFixedThreadPool(a_numThreads);*
>
>
>
> *                        // now create an object to consume the messages*
>
> *                        //*
>
> *                        int threadNumber = 0;*
>
> *                        for (final KafkaStream stream : streams) {*
>
> *                            executor.submit(new ConsumerTest(stream,
> threadNumber));*
>
> *                            threadNumber++;*
>
> *                        }*
>
> *                    }*
>
>
>
> *                    private static ConsumerConfig
> createConsumerConfig(String a_zookeeper, String a_groupId) {*
>
> *                        Properties props = new Properties();*
>
> *                        props.put("zookeeper.connect", a_zookeeper);*
>
> *                        props.put("group.id <http://group.id>",
> a_groupId);*
>
> *                        props.put("zookeeper.session.timeout.ms
> <http://zookeeper.session.timeout.ms>", "40000");*
>
> *                        props.put("zookeeper.sync.time.ms
> <http://zookeeper.sync.time.ms>", "20000");*
>
> *                        props.put("auto.commit.interval.ms
> <http://auto.commit.interval.ms>", "1000");*
>
>
>
> *                        return new ConsumerConfig(props);*
>
> *                    }*
>
>
>
> *                    public static void main(String[] args) {*
>
> *                        String zooKeeper = "172.25.1.94:2181
> <http://172.25.1.94:2181>";*
>
> *                        String groupId = "group1";*
>
> *                        String topic = "partitioned";*
>
> *                        int threads = 5;*
>
>
>
> *                        TestConsumer example = new
> TestConsumer(zooKeeper, groupId, topic);*
>
> *                        example.run(threads);*
>
>
>
> *                        try {*
>
> *                            Thread.sleep(10000);*
>
> *                        } catch (InterruptedException ie) {*
>
>
>
> *                        }*
>
> *                        example.shutdown();*
>
> *                    }*
>
>
>
>
>
> *}*
>
>
>
>
>
> *From*
>
> Neha Narkhede <neha.narkh...@gmail.com>
>
> *Subject*
>
> Re: Kafka High Level Consumer Connector shuts down after 10 seconds
>
> *Date*
>
> Mon, 10 Mar 2014 16:48:31 GMT
>
> Session termination can happen either when client or zookeeper process
>
> pauses (due to GC) or when the client process terminates. A sustainable
>
> solution is to tune GC settings. For now, you can try increasing the
>
> zookeeper.session.timeout.ms.
>
>
>
>
>
>
>
>
>
> On Sun, Mar 9, 2014 at 3:44 PM, Ameya Bhagat <ameya.bha...@gmail.com>
> wrote:
>
>
>
> > I am using a high level consumer as described at:
>
> > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
>
> >
>
> > I am noticing that my consumer does not run forever and ends after some
>
> > time (< 15s). At the zookeeper side, I see the following:
>
> >
>
> > INFO Processed session termination for sessionid: 0x144a4854325004d
>
> > (org.apache.zookeeper.server.PrepRequestProcessor)
>
> > INFO Closed socket connection for client /127.0.0.1:59899 which had
>
> > sessionid 0x144a4854325004d (org.apache.zookeeper.server.NIOServerCnxn)
>
> >
>
> > I am using default configurations. How do I make my consumer listen
>
> > forever?
>
> >
>
> > Thanks
>
> > Ameya
>
> >
>
>
>
>
>
> --
>
> Thanks & Regards,
>
> Gufran Pathan *| *+91-9566811502 *| *www.mu-sigma.com *|*
>
>
>  Disclaimer: http://www.mu-sigma.com/disclaimer.html
>

Reply via email to