Hi Jason,

No, I didn't bring down any zookeeper server. Even I tried with 3 zookeeper 
server one as an 'Observer' but the same issue.

Here is the server log from one of the node of my other datacenter:

[2016-09-01 01:25:19,221] INFO Truncating log TEST3-0 to offset 0. 
(kafka.log.Log)
[2016-09-01 01:25:19,257] INFO [ReplicaFetcherThread-0-3], Starting  
(kafka.server.ReplicaFetcherThread)
[2016-09-01 01:25:19,258] INFO [ReplicaFetcherManager on broker 4] Added 
fetcher for partitions List([[TEST3,0], initOffset 0 to broker 
BrokerEndPoint(3,psaq3-wc.sys.comcast.net,61616)] ) 
(kafka.server.ReplicaFetcherManager)
[2016-09-01 01:26:14,154] WARN [ReplicaFetcherThread-0-3], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@6618a925 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 3 was disconnected before the response was 
read
        at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
        at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
        at scala.Option.foreach(Option.scala:257)
        at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
        at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
        at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137)
        at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
        at 
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
        at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
        at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
        at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:16,189] WARN [ReplicaFetcherThread-0-3], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@6e7e2578 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: 
null) failed
        at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
        at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
        at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
        at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
        at 
kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
        at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239)
        at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
        at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:18,198] WARN [ReplicaFetcherThread-0-3], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@5adea8fb 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: 
null) failed
        at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
        at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
        at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
        at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
        at 
kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
        at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239)
        at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
        at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:20,223] WARN [ReplicaFetcherThread-0-3], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@4c159cc3 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: 
null) failed
        at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
        at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
        at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
        at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
        at 
kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
        at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239)
        at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
        at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:22,246] WARN [ReplicaFetcherThread-0-3], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@611ed1e9 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: 
null) failed
        at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
        at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
        at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
        at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
        at 
kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
        at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239)
        at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
        at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:22,736] INFO [ReplicaFetcherManager on broker 4] Removed 
fetcher for partitions [TEST3,0] (kafka.server.ReplicaFetcherManager)
[2016-09-01 01:26:22,752] INFO [ReplicaFetcherThread-0-3], Shutting down 
(kafka.server.ReplicaFetcherThread)
[2016-09-01 01:26:22,755] INFO [ReplicaFetcherThread-0-3], Stopped  
(kafka.server.ReplicaFetcherThread)
[2016-09-01 01:26:22,756] INFO [ReplicaFetcherThread-0-3], Shutdown completed 
(kafka.server.ReplicaFetcherThread)
[2016-09-01 01:26:48,025] INFO Creating /controller (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2016-09-01 01:26:48,034] INFO Result of znode creation is: OK 
(kafka.utils.ZKCheckedEphemeral)
[2016-09-01 01:26:48,035] INFO 4 successfully elected as leader 
(kafka.server.ZookeeperLeaderElector)
[2016-09-01 01:26:48,726] INFO New leader is 4 
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-09-01 01:26:54,837] INFO Partition [TEST3,0] on broker 4: Shrinking ISR 
for partition [TEST3,0] from 4,5,6,1 to 4,5,6 (kafka.cluster.Partition)
[2016-09-01 01:33:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 
expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 01:43:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 
expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 01:53:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 
expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 02:03:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 
expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 02:13:04,928] INFO [Group Metadata Manager on Broker 4]: Removed 0 
expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 02:23:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 
expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 02:33:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 
expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)


Why it's trying to connect the node3 of my local datacenter and it's throwing 
IOException.

Thanks
Achintya

-----Original Message-----
From: Jason Gustafson [mailto:ja...@confluent.io] 
Sent: Wednesday, August 31, 2016 10:26 PM
To: us...@kafka.apache.org
Cc: dev@kafka.apache.org
Subject: Re: Kafka consumers unable to process message

Hi Achintya,

Just to clarify, you did not take down either of the zookeepers in this test, 
right? Having only two zookeepers in the ensemble would mean that if either one 
of them failed, zookeeper wouldn't be able to reach quorum.

I'm not entirely sure why this would happen. One possibility is that the 
consumer is failing to find the new coordinator, which might happen if all the 
replicas for one of the __consumer_offsets partitions were located in the 
"failed" datacenter. Perhaps you can enable DEBUG logging and post some logs so 
we can see what it's actually doing during poll().

By the way, I noticed that your consumer configuration settings seem a little 
mixed up. The new consumer doesn't actually communicate with Zookeeper, so 
there's no need for those settings. And you don't need to include the 
"offsets.storage" option since Kafka is the only choice. Also, I don't think 
"consumer.timeout.ms" is an option.

-Jason


On Wed, Aug 31, 2016 at 6:43 PM, Ghosh, Achintya (Contractor) < 
achintya_gh...@comcast.com> wrote:

> Hi Jason,
>
> Thanks for your response.
>
> I know that is a known issue and I resolved it calling wakeup method 
> by another thread. But here my problem is different, let me explain , 
> it's very basic
>
> I created one cluster with 6 nodes( 3 from one datacenter and 3 from
> another(remote) datacenter and kept replication factor 6 with 2 
> zookeeper servers one from each datacenter ). Now I brought down all 3 
> nodes of my local datacenter and produced few messages and I see 
> producer is working fine even my local data center nodes are down. It 
> successfully writes the messages to other data center nodes. But when 
> I'm trying to consume the messages the consumer.poll method gets stuck 
> as my local datacenter is down though other datacenter's nodes are up.
>
> My question is as the data has been written successfully to other 
> datacenter why consumer part is not working?
>
> Here is my Producer settings:
>
> props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616, 
> psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.
> sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> -ab.sys.comcast.net:61617");
> props.put("acks", "1");
> props.put("max.block.ms", 1000);
> props.put("key.serializer", "org.apache.kafka.common.serialization.
> StringSerializer");
> props.put("value.serializer", "com.comcast.ps.kafka.object.
> CustomMessageSer");
>
> and here is Consumer settings:
>
> props.put("group.id", "app-consumer");
>                 props.put("enable.auto.commit", "false");
>                 props.put("auto.offset.reset", "earliest");
>                 props.put("auto.commit.interval.ms", "500");
>                 props.put("session.timeout.ms", "120000");
>                 props.put("consumer.timeout.ms", "10000");
>                 props.put("zookeeper.session.timeout.ms", "120000");
>                 props.put("zookeeper.connection.timeout.ms", "60000");
>                 props.put("offsets.storage","kafka");
>                 props.put("request.timeout.ms", "150000");
>                 props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:
> 61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,
> psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> -ab.sys.comcast.net:61617");
>                 props.put("key.deserializer", "org.apache.kafka.common.
> serialization.StringDeserializer");
>                 props.put("value.deserializer", 
> "com.comcast.ps.kafka.object.CustomMessageDeSer");
>
> Is it because of consumer is not able to get the broker metadata if it 
> is trying to connect other datacenter's zookeeper server? I tried with 
> to increate the zookeeper session timeout and connection time out but no luck.
>
> Please help on this.
> Thanks
> Achintya
>
>
> -----Original Message-----
> From: Jason Gustafson [mailto:ja...@confluent.io]
> Sent: Wednesday, August 31, 2016 4:05 PM
> To: us...@kafka.apache.org
> Cc: dev@kafka.apache.org
> Subject: Re: Kafka consumers unable to process message
>
> Hi Achintya,
>
> We have a JIRA for this problem: https://issues.
> apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise 
> an exception in this case or do you just want to keep it from blocking 
> indefinitely? If the latter, you could escape the poll from another 
> thread using wakeup().
>
> Thanks,
> Jason
>
> On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) < 
> achintya_gh...@comcast.com> wrote:
>
> > Hi there,
> >
> > Kafka consumer gets stuck at consumer.poll() method if my current 
> > datacenter is down and replicated messages are in remote datacenter.
> >
> > How to solve that issue?
> >
> > Thanks
> > Achintya
> >
>

Reply via email to