Java threadpool is notorious for eating exceptions. Could you add try/catch in ConsumerTest while iterating through the messages?
Thanks, Jun On Mon, Mar 24, 2014 at 12:44 PM, Gufran Pathan <gufran.pat...@mu-sigma.com>wrote: > Hi, > > > > I'm facing an issue exactly similar to the one issued by someone else a > few days ago (see below for the previous thread transcript). > > > > I'm using a High Level Consumer java program to consume messages. The > consumer ends after 10 seconds (exactly the same time as faced by the other > user). I've tried increasing the "zookeeper.session.timeout.ms" to > "40000" and the "zookeeper.sync.time.ms" to "20000" but still no > difference. A fellow user suggested that the issue is due to GC and it > should be tuned. Any other thoughts? > > > > Attaching the logs and the Consumer code herewith. > > > > Here's the *log info I get from the zookeeper side*: > > > > *[2014-03-24 23:45:58,371] INFO Accepted socket connection from > /172.25.2.122:55327 <http://172.25.2.122:55327> > (org.apache.zookeeper.server.NIOServerCnxn)* > > *[2014-03-24 23:45:58,375] INFO Client attempting to establish new session > at /172.25.2.122:55327 <http://172.25.2.122:55327> > (org.apache.zookeeper.server.NIOServerCnxn)* > > *[2014-03-24 23:45:58,379] INFO Established session 0x144f5342bc60007 with > negotiated timeout 40000 for client /172.25.2.122:55327 > <http://172.25.2.122:55327> (org.apache.zookeeper.server.NIOServerCnxn)* > > *[2014-03-24 23:46:09,129] INFO Processed session termination for > sessionid: 0x144f5342bc60007 > (org.apache.zookeeper.server.PrepRequestProcessor)* > > *[2014-03-24 23:46:09,138] WARN EndOfStreamException: Unable to read > additional data from client sessionid 0x144f5342bc60007, likely client has > closed socket (org.apache.zookeeper.server.NIOServerCnxn)* > > *[2014-03-24 23:46:09,139] INFO Closed socket connection for client > /172.25.2.122:55327 <http://172.25.2.122:55327> which had sessionid > 0x144f5342bc60007 (org.apache.zookeeper.server.NIOServerCnxn)* > > > > *The Consumer logs*: > > > > *INFO 2014-03-24 23:45:58,413 [main] kafka.utils.VerifiableProperties - > Verifying properties* > > *INFO 2014-03-24 23:45:58,435 [main] kafka.utils.VerifiableProperties - > Property auto.commit.interval.ms <http://auto.commit.interval.ms> is > overridden to 1000* > > *INFO 2014-03-24 23:45:58,435 [main] kafka.utils.VerifiableProperties - > Property group.id <http://group.id> is overridden to group1* > > *INFO 2014-03-24 23:45:58,435 [main] kafka.utils.VerifiableProperties - > Property zookeeper.connect is overridden to 172.25.1.94:2181 > <http://172.25.1.94:2181>* > > *INFO 2014-03-24 23:45:58,436 [main] kafka.utils.VerifiableProperties - > Property zookeeper.session.timeout.ms <http://zookeeper.session.timeout.ms> > is overridden to 40000* > > *INFO 2014-03-24 23:45:58,436 [main] kafka.utils.VerifiableProperties - > Property zookeeper.sync.time.ms <http://zookeeper.sync.time.ms> is > overridden to 20000* > > *INFO 2014-03-24 23:45:58,485 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Connecting to zookeeper instance > at 172.25.1.94:2181 <http://172.25.1.94:2181>* > > *INFO 2014-03-24 23:45:58,493 [ZkClient-EventThread-9-172.25.1.94:2181] > org.I0Itec.zkclient.ZkEventThread - Starting ZkClient event thread.* > > *INFO 2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper - > Client environment:zookeeper.version=3.3.1-942149, built on 05/07/2010 > 17:14 GMT* > > *INFO 2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper - > Client environment:host.name <http://host.name>=LAPSZ0914.mu-sigma.local* > > *INFO 2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper - > Client environment:java.version=1.7.0_51* > > *INFO 2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper - > Client environment:java.vendor=Oracle Corporation* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:java.home=C:\Program Files\Java\jre7* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client > environment:java.class.path=C:\Users\*****\NewWorkSpace\KafkaNew\target\classes;D:\Modules\sl4j\slf4j-1.7.6\slf4j-log4j12-1.7.6.jar;C:\Users\*****\.m2\repository\org\apache\kafka\kafka_2.9.1\0.8.0-beta1\kafka_2.9.1-0.8.0-beta1.jar;C:\Users\*****\.m2\repository\org\scala-lang\scala-library\2.9.1\scala-library-2.9.1.jar;C:\Users\*****\.m2\repository\org\scala-lang\scala-compiler\2.9.1\scala-compiler-2.9.1.jar;C:\Users\*****\.m2\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;C:\Users\*****\.m2\repository\com\yammer\metrics\metrics-annotation\2.2.0\metrics-annotation-2.2.0.jar;C:\Users\*****\.m2\repository\com\101tec\zkclient\0.3\zkclient-0.3.jar;C:\Users\*****\.m2\repository\org\apache\zookeeper\zookeeper\3.3.1\zookeeper-3.3.1.jar;C:\Users\*****\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;C:\Users\*****\.m2\repository\log4j\log4j\1.2.14\log4j-1.2.14.jar;C:\Users\*****\.m2\repository\net\sf\jopt-simple\jopt-simple\3.2\jopt-simple-3.2.jar;C:\Users\*****\.m2\repository\junit\junit\3.8.1\junit-3.8.1.jar;C:\Users\*****\.m2\repository\org\slf4j\slf4j-api\1.7.6\slf4j-api-1.7.6.jar* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:java.library.path=C:\Program > Files\Java\jre7\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:/Program > Files/Java/jre7/bin/client;C:/Program Files/Java/jre7/bin;C:/Program > Files/Java/jre7/lib/i386;C:\Python27\;C:\Python27\Scripts;C:\Program > Files\Common Files\Microsoft Shared\Microsoft Online Services;C:\Program > Files\RSA SecurID Token Common;C:\Program Files\Intel\iCLS > Client\;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program > Files\Intel\Intel(R) Management Engine Components\DAL;C:\Program > Files\Intel\Intel(R) Management Engine Components\IPT;C:\Program > Files\Intel\OpenCL SDK\2.0\bin\x86;c:\Program Files\Microsoft SQL > Server\100\Tools\Binn\VSShell\Common7\IDE\;c:\Program Files\Microsoft SQL > Server\100\Tools\Binn\;c:\Program Files\Microsoft SQL > Server\100\DTS\Binn\;C:\Program Files\SAS\SharedFiles\Formats;C:\Program > Files\Java\jdk1.7.0_51\bin;C:\Program > Files\apache-maven-3.0.5\bin;C:\Program Files\Git\cmd; > D:\Modules\Storm\apache-storm-0.9.1-incubating\bin;C:\Program > Files\GnuWin32\bin;C:\Program > Files\sbt\\bin;C:\Users\*****\AppData\Roaming\Python\Scripts;C:\Program > Files\Apache Software Foundation\apache-maven-3.1.1\bin;D:\Installation > Deck\Software-2\Eclipse\eclipse-jee-juno-win32\eclipse;;.* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:java.io.tmpdir=C:\Users\GUFRAN~1.PAT\AppData\Local\Temp\* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:java.compiler=<NA>* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:os.name <http://os.name>=Windows 7* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:os.arch=x86* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:os.version=6.1* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:user.name <http://user.name>=****** > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:user.home=C:\Users\****** > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:user.dir=C:\Users\*****\NewWorkSpace\KafkaNew* > > *INFO 2014-03-24 23:45:58,501 [main] org.apache.zookeeper.ZooKeeper - > Initiating client connection, connectString=172.25.1.94:2181 > <http://172.25.1.94:2181> sessionTimeout=40000 > watcher=org.I0Itec.zkclient.ZkClient@1d08c1b* > > *INFO 2014-03-24 23:45:58,520 [main-SendThread()] > org.apache.zookeeper.ClientCnxn - Opening socket connection to server > /172.25.1.94:2181 <http://172.25.1.94:2181>* > > *INFO 2014-03-24 23:45:58,526 [main-SendThread(vm.centos.com:2181 > <http://vm.centos.com:2181>)] org.apache.zookeeper.ClientCnxn - Socket > connection established to vm.centos.com/172.25.1.94:2181 > <http://vm.centos.com/172.25.1.94:2181>, initiating session* > > *INFO 2014-03-24 23:45:58,538 [main-SendThread(vm.centos.com:2181 > <http://vm.centos.com:2181>)] org.apache.zookeeper.ClientCnxn - Session > establishment complete on server vm.centos.com/172.25.1.94:2181 > <http://vm.centos.com/172.25.1.94:2181>, sessionid = 0x144f5342bc60007, > negotiated timeout = 40000* > > *INFO 2014-03-24 23:45:58,540 [main-EventThread] > org.I0Itec.zkclient.ZkClient - zookeeper state changed (SyncConnected)* > > *INFO 2014-03-24 23:45:58,559 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], starting auto committer every > 1000 ms* > > *INFO 2014-03-24 23:45:58,656 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], begin registering consumer > group1_LAPSZ0914-1395684958481-7fc24c0a in ZK* > > *INFO 2014-03-24 23:45:58,680 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], end registering consumer > group1_LAPSZ0914-1395684958481-7fc24c0a in ZK* > > *INFO 2014-03-24 23:45:58,682 > [group1_LAPSZ0914-1395684958481-7fc24c0a_watcher_executor] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], starting watcher executor thread > for consumer group1_LAPSZ0914-1395684958481-7fc24c0a* > > *INFO 2014-03-24 23:45:58,710 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], begin rebalancing consumer > group1_LAPSZ0914-1395684958481-7fc24c0a try #0* > > *INFO 2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties - > Verifying properties* > > *INFO 2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties - > Property metadata.broker.list is overridden to vm:9092* > > *INFO 2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties - > Property request.timeout.ms <http://request.timeout.ms> is overridden to > 30000* > > *INFO 2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties - > Property client.id <http://client.id> is overridden to group1* > > *INFO 2014-03-24 23:45:58,888 [main] kafka.client.ClientUtils$ - > Fetching metadata from broker id:0,host:vm,port:9092 with correlation id 0 > for 1 topic(s) Set(partitioned)* > > *INFO 2014-03-24 23:45:58,893 [main] kafka.producer.SyncProducer - > Connected to vm:9092 for producing* > > *INFO 2014-03-24 23:45:58,914 [main] kafka.producer.SyncProducer - > Disconnecting from vm:9092* > > *INFO 2014-03-24 23:45:58,924 [main] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Stopping leader finder thread* > > *INFO 2014-03-24 23:45:58,925 [main] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Stopping all fetchers* > > *INFO 2014-03-24 23:45:58,926 [main] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] All connections stopped* > > *INFO 2014-03-24 23:45:58,927 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Cleared all relevant queues for > this fetcher* > > *INFO 2014-03-24 23:45:58,928 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Cleared the data chunks in all > the consumer message iterators* > > *INFO 2014-03-24 23:45:58,928 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Committing all offsets after > clearing the fetcher queues* > > *INFO 2014-03-24 23:45:58,929 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Releasing partition ownership* > > *INFO 2014-03-24 23:45:58,932 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Consumer > group1_LAPSZ0914-1395684958481-7fc24c0a rebalancing the following > partitions: ArrayBuffer(0, 1, 2, 3, 4) for topic partitioned with > consumers: List(group1_LAPSZ0914-1395684958481-7fc24c0a-0, > group1_LAPSZ0914-1395684958481-7fc24c0a-1, > group1_LAPSZ0914-1395684958481-7fc24c0a-2, > group1_LAPSZ0914-1395684958481-7fc24c0a-3, > group1_LAPSZ0914-1395684958481-7fc24c0a-4)* > > *INFO 2014-03-24 23:45:58,934 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-4 attempting to claim partition 4* > > *INFO 2014-03-24 23:45:58,939 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-3 attempting to claim partition 3* > > *INFO 2014-03-24 23:45:58,942 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-1 attempting to claim partition 1* > > *INFO 2014-03-24 23:45:58,949 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-2 attempting to claim partition 2* > > *INFO 2014-03-24 23:45:58,953 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-0 attempting to claim partition 0* > > *INFO 2014-03-24 23:45:58,967 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-1 successfully owned partition 1 > for topic partitioned* > > *INFO 2014-03-24 23:45:58,970 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-2 successfully owned partition 2 > for topic partitioned* > > *INFO 2014-03-24 23:45:58,973 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-0 successfully owned partition 0 > for topic partitioned* > > *INFO 2014-03-24 23:45:58,977 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-4 successfully owned partition 4 > for topic partitioned* > > *INFO 2014-03-24 23:45:58,980 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-3 successfully owned partition 3 > for topic partitioned* > > *INFO 2014-03-24 23:45:58,981 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Updating the cache* > > *INFO 2014-03-24 23:45:58,984 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Consumer > group1_LAPSZ0914-1395684958481-7fc24c0a selected partitions : > partitioned:0: fetched offset = 1939: consumed offset = 1939,partitioned:1: > fetched offset = 1969: consumed offset = 1969,partitioned:2: fetched offset > = 30917: consumed offset = 30917,partitioned:3: fetched offset = 52147: > consumed offset = 52147,partitioned:4: fetched offset = 1876: consumed > offset = 1876* > > *INFO 2014-03-24 23:45:58,988 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], end rebalancing consumer > group1_LAPSZ0914-1395684958481-7fc24c0a try #0* > > *INFO 2014-03-24 23:45:58,989 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread - > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Starting * > > * INFO 2014-03-24 23:45:59,000 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.utils.VerifiableProperties - Verifying properties* > > *INFO 2014-03-24 23:45:59,000 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.utils.VerifiableProperties - Property metadata.broker.list is > overridden to vm:9092* > > *INFO 2014-03-24 23:45:59,000 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.utils.VerifiableProperties - Property request.timeout.ms > <http://request.timeout.ms> is overridden to 30000* > > *INFO 2014-03-24 23:45:59,000 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.utils.VerifiableProperties - Property client.id <http://client.id> > is overridden to group1* > > *INFO 2014-03-24 23:45:59,001 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.client.ClientUtils$ - Fetching metadata from broker > id:0,host:vm,port:9092 with correlation id 0 for 1 topic(s) > Set(partitioned)* > > *INFO 2014-03-24 23:45:59,004 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.producer.SyncProducer - Connected to vm:9092 for producing* > > *INFO 2014-03-24 23:45:59,043 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.producer.SyncProducer - Disconnecting from vm:9092* > > *INFO 2014-03-24 23:45:59,056 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Adding fetcher for partition > [partitioned,2], initOffset 30917 to broker 0 with fetcherId 0* > > *INFO 2014-03-24 23:45:59,056 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Adding fetcher for partition > [partitioned,3], initOffset 52147 to broker 0 with fetcherId 0* > > *INFO 2014-03-24 23:45:59,056 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Adding fetcher for partition > [partitioned,1], initOffset 1969 to broker 0 with fetcherId 0* > > *INFO 2014-03-24 23:45:59,056 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Adding fetcher for partition > [partitioned,0], initOffset 1939 to broker 0 with fetcherId 0* > > *INFO 2014-03-24 23:45:59,056 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Adding fetcher for partition > [partitioned,4], initOffset 1876 to broker 0 with fetcherId 0* > > *INFO 2014-03-24 23:45:59,058 > [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0] > kafka.consumer.ConsumerFetcherThread - > [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0], > Starting * > > * INFO 2014-03-24 23:46:08,991 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], ZKConsumerConnector shutting > down* > > *INFO 2014-03-24 23:46:08,992 [main] kafka.utils.KafkaScheduler - > Forcing shutdown of Kafka scheduler* > > *INFO 2014-03-24 23:46:08,992 [main] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Stopping leader finder thread* > > *INFO 2014-03-24 23:46:08,992 [main] > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread - > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Shutting > down* > > *INFO 2014-03-24 23:46:08,993 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread - > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Stopped * > > * INFO 2014-03-24 23:46:08,993 [main] > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread - > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Shutdown > completed* > > *INFO 2014-03-24 23:46:08,993 [main] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Stopping all fetchers* > > *INFO 2014-03-24 23:46:08,993 [main] > kafka.consumer.ConsumerFetcherThread - > [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0], > Shutting down* > > *INFO 2014-03-24 23:46:09,257 > [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0] > kafka.consumer.SimpleConsumer - Reconnect due to socket error: * > > * java.nio.channels.ClosedByInterruptException* > > * at > java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown Source)* > > * at sun.nio.ch.SocketChannelImpl.read(Unknown Source)* > > * at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(Unknown Source)* > > * at sun.nio.ch.ChannelInputStream.read(Unknown Source)* > > * at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown Source)* > > * at kafka.utils.Utils$.read(Utils.scala:394)* > > * at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)* > > * at > kafka.network.Receive$class.readCompletely(Transmission.scala:56)* > > * at > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)* > > * at > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)* > > * at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)* > > * at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)* > > * at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)* > > * at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)* > > * at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)* > > * at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)* > > * at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)* > > * at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)* > > * at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)* > > * at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)* > > * at > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)* > > * at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)* > > * at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)* > > * at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)* > > *INFO 2014-03-24 23:46:09,259 > [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0] > kafka.consumer.ConsumerFetcherThread - > [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0], > Stopped * > > * INFO 2014-03-24 23:46:09,259 [main] > kafka.consumer.ConsumerFetcherThread - > [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0], > Shutdown completed* > > *INFO 2014-03-24 23:46:09,259 [main] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] All connections stopped* > > *INFO 2014-03-24 23:46:09,283 [ZkClient-EventThread-9-172.25.1.94:2181] > org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event thread.* > > *INFO 2014-03-24 23:46:09,293 [main] org.apache.zookeeper.ZooKeeper - > Session: 0x144f5342bc60007 closed* > > *INFO 2014-03-24 23:46:09,294 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], ZKConsumerConnector shut down > completed* > > *INFO 2014-03-24 23:46:09,694 > [group1_LAPSZ0914-1395684958481-7fc24c0a_watcher_executor] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], stopping watcher executor thread > for consumer group1_LAPSZ0914-1395684958481-7fc24c0a* > > > > *My Consumer code*: > > > > *import kafka.consumer.ConsumerConfig;* > > *import kafka.consumer.KafkaStream;* > > *import kafka.javaapi.consumer.ConsumerConnector;* > > *import java.util.HashMap;* > > *import java.util.List;* > > *import java.util.Map;* > > *import java.util.Properties;* > > *import java.util.concurrent.ExecutorService;* > > *import java.util.concurrent.Executors;* > > > > *public class TestConsumer {* > > * private final ConsumerConnector consumer;* > > * private final String topic;* > > * private ExecutorService executor;* > > > > * public TestConsumer(String a_zookeeper, String > a_groupId, String a_topic) {* > > * consumer = > kafka.consumer.Consumer.createJavaConsumerConnector(* > > * createConsumerConfig(a_zookeeper, > a_groupId));* > > * this.topic = a_topic;* > > * }* > > > > * public void shutdown() {* > > * if (consumer != null) consumer.shutdown();* > > * if (executor != null) executor.shutdown();* > > * }* > > > > * public void run(int a_numThreads) {* > > * Map<String, Integer> topicCountMap = new > HashMap<String, Integer>();* > > * topicCountMap.put(topic, new > Integer(a_numThreads));* > > * Map<String, List<KafkaStream<byte[], byte[]>>> > consumerMap = consumer.createMessageStreams(topicCountMap);* > > * List<KafkaStream<byte[], byte[]>> streams = > consumerMap.get(topic);* > > > > * // now launch all the threads* > > * //* > > * executor = > Executors.newFixedThreadPool(a_numThreads);* > > > > * // now create an object to consume the messages* > > * //* > > * int threadNumber = 0;* > > * for (final KafkaStream stream : streams) {* > > * executor.submit(new ConsumerTest(stream, > threadNumber));* > > * threadNumber++;* > > * }* > > * }* > > > > * private static ConsumerConfig > createConsumerConfig(String a_zookeeper, String a_groupId) {* > > * Properties props = new Properties();* > > * props.put("zookeeper.connect", a_zookeeper);* > > * props.put("group.id <http://group.id>", > a_groupId);* > > * props.put("zookeeper.session.timeout.ms > <http://zookeeper.session.timeout.ms>", "40000");* > > * props.put("zookeeper.sync.time.ms > <http://zookeeper.sync.time.ms>", "20000");* > > * props.put("auto.commit.interval.ms > <http://auto.commit.interval.ms>", "1000");* > > > > * return new ConsumerConfig(props);* > > * }* > > > > * public static void main(String[] args) {* > > * String zooKeeper = "172.25.1.94:2181 > <http://172.25.1.94:2181>";* > > * String groupId = "group1";* > > * String topic = "partitioned";* > > * int threads = 5;* > > > > * TestConsumer example = new > TestConsumer(zooKeeper, groupId, topic);* > > * example.run(threads);* > > > > * try {* > > * Thread.sleep(10000);* > > * } catch (InterruptedException ie) {* > > > > * }* > > * example.shutdown();* > > * }* > > > > > > *}* > > > > > > *From* > > Neha Narkhede <neha.narkh...@gmail.com> > > *Subject* > > Re: Kafka High Level Consumer Connector shuts down after 10 seconds > > *Date* > > Mon, 10 Mar 2014 16:48:31 GMT > > Session termination can happen either when client or zookeeper process > > pauses (due to GC) or when the client process terminates. A sustainable > > solution is to tune GC settings. For now, you can try increasing the > > zookeeper.session.timeout.ms. > > > > > > > > > > On Sun, Mar 9, 2014 at 3:44 PM, Ameya Bhagat <ameya.bha...@gmail.com> > wrote: > > > > > I am using a high level consumer as described at: > > > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example > > > > > > I am noticing that my consumer does not run forever and ends after some > > > time (< 15s). At the zookeeper side, I see the following: > > > > > > INFO Processed session termination for sessionid: 0x144a4854325004d > > > (org.apache.zookeeper.server.PrepRequestProcessor) > > > INFO Closed socket connection for client /127.0.0.1:59899 which had > > > sessionid 0x144a4854325004d (org.apache.zookeeper.server.NIOServerCnxn) > > > > > > I am using default configurations. How do I make my consumer listen > > > forever? > > > > > > Thanks > > > Ameya > > > > > > > > > -- > > Thanks & Regards, > > Gufran Pathan *| *+91-9566811502 *| *www.mu-sigma.com *|* > > > Disclaimer: http://www.mu-sigma.com/disclaimer.html >