Re: In kafka streams consumer seems to hang while retrieving the offsets

2017-04-09 Thread Sachin Mittal
Let me try to get the debug log when this error happens.

Right now we have three instances each with 4 threads consuming from 12
partition topic.
So one thread per partition.

The application is running fine much better than before. Now it usually
runs for a week even during peak load.

Sometime out of blue either rocksdb throws an exception with a single
character (which I guess is a known issue with rocks db fixed in some next
release).
Or the producer gets timed out while committing some changelog topic
record. I had increased the timeout from 30 seconds to 180 seconds, but it
still throws exception for that time also.

Not sure if these are due to VM issue or network.

But whenever something like this happens, the application goes into
rebalance and soon things take turn for worse. Soon some of the threads go
into deadlock with above stack trace and application is now in perpetual
rebalance state.

Only way to resolve this is kill all instances using -9 and restart the
instances one by one.

So also long as we have a steady state of one thread per partition
everything is working fine. I am still working out a way to limit the
changelog topic size by more aggressive compaction and let me see if that
will make things better.

I will try to get the logs when this happens next time.

Thanks
Sachin



On Sun, Apr 9, 2017 at 6:05 PM, Eno Thereska  wrote:

> Hi Sachin,
>
> It's not necessarily a deadlock. Do you have any debug traces from those
> nodes? Also would be useful to know the config (e.g., how many partitions
> do you have and how many app instances.)
>
> Thanks
> Eno
>
> > On 9 Apr 2017, at 04:45, Sachin Mittal  wrote:
> >
> > Hi,
> > In my streams applications cluster in one or more instances I see some
> > threads always waiting with the following stack.
> >
> > Every time I check on jstack I see the following trace.
> >
> > Is this some kind of new deadlock that we have failed to identify.
> >
> > Thanks
> > Sachin
> >
> > here is the stack trace:
> > 
> 
> --
> > "StreamThread-4" #20 prio=5 os_prio=0 tid=0x7fb814be3000 nid=0x19bf
> > runnable [0x7fb7cb4f6000]
> >   java.lang.Thread.State: RUNNABLE
> >at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> >at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> >at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.
> java:93)
> >at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> >- locked <0x000701c50c98> (a sun.nio.ch.Util$3)
> >- locked <0x000701c50c88> (a java.util.Collections$
> > UnmodifiableSet)
> >- locked <0x000701c4f6a8> (a sun.nio.ch.EPollSelectorImpl)
> >at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> >at org.apache.kafka.common.network.Selector.select(
> > Selector.java:489)
> >at org.apache.kafka.common.network.Selector.poll(
> Selector.java:298)
> >at org.apache.kafka.clients.NetworkClient.poll(
> > NetworkClient.java:349)
> >at org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
> >- locked <0x000701c5da48> (a org.apache.kafka.clients.
> > consumer.internals.ConsumerNetworkClient)
> >at org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
> >at org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:
> 138)
> >at org.apache.kafka.clients.consumer.internals.Fetcher.
> > retrieveOffsetsByTimes(Fetcher.java:422)
> >at org.apache.kafka.clients.consumer.internals.Fetcher.
> > resetOffset(Fetcher.java:370)
> >at org.apache.kafka.clients.consumer.internals.Fetcher.
> > resetOffsetsIfNeeded(Fetcher.java:227)
> >at org.apache.kafka.clients.consumer.KafkaConsumer.
> > updateFetchPositions(KafkaConsumer.java:1592)
> >at org.apache.kafka.clients.consumer.KafkaConsumer.
> > position(KafkaConsumer.java:1265)
> >at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:213)
>
>


Re: How to set offset for a consumer in Kafka 0.10.0.X

2017-04-09 Thread Glen Ogilvie
Hi Sudev, 

My version is pretty much the same as 
https://github.com/goibibo/woof/tree/master/scripts/offset_commit_manual. 
So, little point posting code which does the same thing, also it python, 
although with a different method of parsing the params to it. 

Sorry about the late reply, didn't remember to check this mailing list for a 
while. 

Regards 
-- 
Glen Ogilvie 
Open Systems Specialists 
Level 1, 162 Grafton Road 
http://www.oss.co.nz/ 

Ph: +64 9 984 3000 
Mobile: +64 21 684 146 
GPG Key: ACED9C17 


From: "Sudev A C"  
To: "users"  
Sent: Thursday, 9 March, 2017 7:25:21 PM 
Subject: Re: How to set offset for a consumer in Kafka 0.10.0.X 

Hi Glen, 

Maybe have a look at this, slightly different version. Not doing anything 
more than what you have seen in gist. 

You should pass broker url as first parameter and and a json containing 
desired offsets as second parameter. 

https://github.com/goibibo/woof/tree/master/scripts/offset_commit_manual 

As suggested in previous mail it will be good to shut all consumers before 
reseting the offset. 

Thanks 
Sudev 
On Thu, 9 Mar 2017 at 11:41 AM, Manikumar  wrote: 

> A tool for reseting consumer group offsets is proposed here: 
> KIP-122: 
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling
>  
> 
> KIP is currently in the voting stage. 
> 
> On Thu, Mar 9, 2017 at 7:35 AM, Jeff Widman  wrote: 
> 
> > Did you have to do anything different beyond what was already in the 
> Gist? 
> > 
> > I'd be curious to see the code as I've considered putting together a 
> small 
> > repo of various python scripts I've found useful when working with Kafka 
> > 
> > On Wed, Mar 8, 2017 at 1:11 PM, Glen Ogilvie  
> > wrote: 
> > 
> > > Thank you Jeff and Robert. 
> > > 
> > > I've had success in getting the offset position to seek to a position 
> in 
> > > the queue, when all other consumers in the group are off. 
> > > 
> > > Would anyone like me to tidy the tool up enough that it could be 
> included 
> > > in Kafka or the docs? 
> > > 
> > > Regards 
> > > -- 
> > > Glen Ogilvie 
> > > Open Systems Specialists 
> > > Level 1, 162 Grafton Road 
> > > http://www.oss.co.nz/ 
> > > 
> > > Ph: +64 9 984 3000 
> > > Mobile: +64 21 684 146 
> > > GPG Key: ACED9C17 
> > > 
> > > 
> > > From: "Robert Quinlivan"  
> > > To: "users"  
> > > Sent: Thursday, 9 March, 2017 6:42:15 AM 
> > > Subject: Re: How to set offset for a consumer in Kafka 0.10.0.X 
> > > 
> > > The best approach would be: 
> > > - Have all consumers in your group shut down 
> > > - Have an offset reset tool join with the same group name as above 
> > > - Offset tool subscribes to all topic-partitions, seeks to the desired 
> > > offset, and commits. 
> > > - Offset tool shuts down 
> > > - Consumers then restart and re-join the consumer group, resuming at 
> the 
> > > offsets that were last committed for each topic-partition 
> > > 
> > > On Wed, Mar 8, 2017 at 10:51 AM, Jeff Widman  
> wrote: 
> > > 
> > > > Yeah, that gist looks like it *should* work. I haven't tested it so 
> > can't 
> > > > guarantee. 
> > > > 
> > > > On Tue, Mar 7, 2017 at 7:04 PM, Glen Ogilvie  > 
> > > > wrote: 
> > > > 
> > > > > Hi Jeff, 
> > > > > 
> > > > > Yes, the work I am doing is ops work. Logstash is consuming from 
> the 
> > > > topic 
> > > > > + consumer group, and I don't want it to start at the beginning, 
> but 
> > > > rather 
> > > > > at a specific offset, 
> > > > > so setting the offset for the consumer group externally, then 
> > starting 
> > > up 
> > > > > logstash is my goal. 
> > > > > 
> > > > > I'm still a little unclear as to how to do this. 
> > > > > 
> > > > > Is this python script: http://pastebin.com/tvxj1wTX 
> > > > > 
> > > > > The right way to go about getting the offset set to a specific 
> value 
> > > > > (12345678 in this example) for a specific consumer group? 
> > > > > 
> > > > > Regards 
> > > > > -- 
> > > > > Glen Ogilvie 
> > > > > Open Systems Specialists 
> > > > > Level 1, 162 Grafton Road 
> > > > > http://www.oss.co.nz/ 
> > > > > 
> > > > > Ph: +64 9 984 3000 
> > > > > Mobile: +64 21 684 146 
> > > > > GPG Key: ACED9C17 
> > > > > 
> > > > > 
> > > > > From: "Jeff Widman"  
> > > > > To: "users"  
> > > > > Sent: Wednesday, 8 March, 2017 1:41:17 PM 
> > > > > Subject: Re: How to set offset for a consumer in Kafka 0.10.0.X 
> > > > > 
> > > > > Offsets for modern kafka consumers are stored in an internal Kafka 
> > > topic, 
> > > > > so they aren't as easy to change as zookeeper. 
> > > > > 
> > > > > To set a consumer offset, you need a consumer within a consumer 
> group 
> > > to 
> > > > > call commit() with your explicit offset. If needed, you can create 
> a 
> > 

Re: offset.storage.filename configuration in kafka-connect-hdfs

2017-04-09 Thread Ewen Cheslack-Postava
The offset file is marked as required so we can fail early & fast, but if
you only run sink connectors then offsets will be stored in Kafka's normal
offsets topic and the file will never need to be created. (And the HDFS
connector is even more unusual in that it doesn't even rely on Kafka
offsets because it stores offset information in HDFS to provide exactly
once delivery guarantees.)

-Ewen

On Tue, Mar 7, 2017 at 11:15 PM, FEI Aggie 
wrote:

> Hi,
> I'm running kafka-connect-hdfs 3.1.1.
> offset.storage.filename is a required configuration for standalone
> connector. But when I set this parameter in worker configuration file for
> kafka-connect-hdfs with standlone mode:
> offset.storage.file.filename=/mnt/data/connect.offsets
>
> It never works. The configured file is not generated.
> Anyone know this issue?
>
> Thanks!
> Aggie
>


Re: Kafka Connect behaves weird in case of zombie Kafka brokers. Also, zombie brokers?

2017-04-09 Thread Ewen Cheslack-Postava
Is that node the only bootstrap broker provided? If the Connect worker was
pinned to *only* that broker, it wouldn't have any chance of recovering
correct cluster information from the healthy brokers.

It sounds like there was a separate problem as well (the broker should have
figured out it was in a bad state wrt ZK), but normally we would expect
Connect to detect the issue, mark the coordinator dead (as it did) and then
refresh metadata to figure out which broker it should be talking to now.
There are some known edge cases around how initial cluster discovery works
which *might* be able to get you stuck in this situation.

-Ewen

On Tue, Mar 21, 2017 at 10:43 PM, Anish Mashankar 
wrote:

> Hello everyone,
> We are running a 5 broker Kafka v0.10.0.0 cluster on AWS. Also, the connect
> api is in v0.10.0.0.
> It was observed that the distributed kafka connector went into infinite
> loop of log message of
>
> (Re-)joining group connect-connect-elasticsearch-indexer.
>
> And after a little more digging. There was another infinite loop of set of
> log messages
>
> *Discovered coordinator 1.2.3.4:9092 (id:  rack: null) for group x*
>
> *Marking the coordinator 1.2.3.4:9092  (id: 
> rack:
> null) dead for group x*
>
> Restarting Kafka connect did not help.
>
> Looking at zookeeper, we realized that broker 1.2.3.4 had left the
> zookeeper cluster. It had happened due to a timeout when interacting with
> zookeeper. The broker was also the controller. Failover of controller
> happened, and the applications were fine, but few days later, we started
> facing the above mentioned issue. To add to the surprise, the kafka daemon
> was still running on the host but was not trying to contact zookeeper in
> any time. Hence, zombie broker.
>
> Also, a connect cluster spread across multiple hosts was not working,
> however a single connector worked.
>
> After replacing the EC2 instance for the broker 1.2.3.4, kafka connect
> cluster started working fine. (same broker ID)
>
> I am not sure if this is a bug. Kafka connect shouldn't be trying the same
> broker if it is not able establish connection. We use consul for service
> discovery. As broker was running and 9092 port was active, consul reported
> it was working and redirected dns queries to that broker when the connector
> tried to connect. We have disabled DNS caching in the java options, and
> Kafka connect should've tried to connect to some other host each time it
> tried to query, but instead it only tried on 1.2.3.4 broker.
>
> Does kafka connect internally cache DNS? Is there a debugging step I am
> missing here?
> --
> Anish Samir Mashankar
>


Re: kafka connector for mongodb as a source

2017-04-09 Thread Ewen Cheslack-Postava
There is some log noise in there from Reflections, but it does look like
your connector & task are being created:

[2017-03-27 18:33:00,057] INFO Instantiated task mongodb-0 with version
0.10.0.1 of type org.apache.kafka.connect.mongodb.MongodbSourceTask
(org.apache.kafka.connect.runtime.Worker:264)

And I see the producer configs for the source task's underlying producer
being logged. Then we see the following, suggesting some sort of connection
is being made successfully:

[2017-03-27 18:33:00,397] INFO Source task WorkerSourceTask{id=mongodb-0}
finished initialization and start
(org.apache.kafka.connect.runtime.WorkerSourceTask:138)
[2017-03-27 18:33:00,442] INFO No server chosen by
ReadPreferenceServerSelector{readPreference=primary} from cluster
description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE,
all=[ServerDescription{address=localhost:27017, type=UNKNOWN,
state=CONNECTING}]}. Waiting for 3 ms before timing out
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,455] INFO Opened connection
[connectionId{localValue:1, serverValue:4}] to localhost:27017
(org.mongodb.driver.connection:71)
[2017-03-27 18:33:00,457] INFO Monitor thread successfully connected to
server with description ServerDescription{address=localhost:27017,
type=STANDALONE, state=CONNECTED, ok=true,
version=ServerVersion{versionList=[3, 2, 12]}, minWireVersion=0,
maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=536169}
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,491] INFO Opened connection
[connectionId{localValue:2, serverValue:5}] to localhost:27017
(org.mongodb.driver.connection:71)

But then the logs stop. The framework should just be calling poll() on your
source task. Perhaps you could add some logging to your code to give some
hint as to where it is getting stuck? You could also try increasing the log
level for the framework to DEBUG or even TRACE.

-Ewen

On Mon, Mar 27, 2017 at 6:22 AM, VIVEK KUMAR MISHRA 13BIT0066 <
vivekkumar.mishra2...@vit.ac.in> wrote:

> Hi All,
>
> I am creating kafka connector for mongodb as a source .My connector is
> starting and connecting with kafka but it is not committing any offset.
>
> This is output after starting connector.
>
> [root@localhost kafka_2.11-0.10.1.1]# bin/connect-standalone.sh
> config/connect-standalone.properties config/mongodb.properties
> [2017-03-27 18:32:58,019] INFO StandaloneConfig values:
> rest.advertised.host.name = null
> task.shutdown.graceful.timeout.ms = 5000
> rest.host.name = null
> rest.advertised.port = null
> bootstrap.servers = [localhost:9092]
> offset.flush.timeout.ms = 5000
> offset.flush.interval.ms = 1
> rest.port = 8083
> internal.key.converter = class
> org.apache.kafka.connect.json.JsonConverter
> access.control.allow.methods =
> access.control.allow.origin =
> offset.storage.file.filename = /tmp/connect.offsets
> internal.value.converter = class
> org.apache.kafka.connect.json.JsonConverter
> value.converter = class org.apache.kafka.connect.json.
> JsonConverter
> key.converter = class org.apache.kafka.connect.json.JsonConverter
>  (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:178)
> [2017-03-27 18:32:58,162] INFO Logging initialized @609ms
> (org.eclipse.jetty.util.log:186)
> [2017-03-27 18:32:58,392] INFO Kafka Connect starting
> (org.apache.kafka.connect.runtime.Connect:52)
> [2017-03-27 18:32:58,392] INFO Herder starting
> (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70)
> [2017-03-27 18:32:58,393] INFO Worker starting
> (org.apache.kafka.connect.runtime.Worker:113)
> [2017-03-27 18:32:58,393] INFO Starting FileOffsetBackingStore with file
> /tmp/connect.offsets
> (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
> [2017-03-27 18:32:58,398] INFO Worker started
> (org.apache.kafka.connect.runtime.Worker:118)
> [2017-03-27 18:32:58,398] INFO Herder started
> (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)
> [2017-03-27 18:32:58,398] INFO Starting REST server
> (org.apache.kafka.connect.runtime.rest.RestServer:98)
> [2017-03-27 18:32:58,493] INFO jetty-9.2.15.v20160210
> (org.eclipse.jetty.server.Server:327)
> [2017-03-27 18:32:59,621] INFO HV01: Hibernate Validator 5.1.2.Final
> (org.hibernate.validator.internal.util.Version:27)
> Mar 27, 2017 6:32:59 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The
> (sub)resource method listConnectors in
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource
> contains
> empty path annotation.
> WARNING: The (sub)resource method createConnector in
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource
> contains
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource
> contains empty 

Re: Kafka producer and consumer within on sync execution

2017-04-09 Thread Hans Jespersen
You posted the same question to Stack Overflow so I answered it there 

https://stackoverflow.com/questions/43302857/handling-sync-api-call-rest-spring-and-async-message-kafka-in-the-same-execu/43312070#43312070
 


-hans




> On Apr 8, 2017, at 8:49 PM, Rams N <99ram...@gmail.com> wrote:
> 
> Hi,
> I've an usecase to respond to an API call to the client which should happen
> in sync. But within the api execution, the system A need to publish a kafka
> message to an different system B and which responds to another kafka topic.
> The response must be consumed by A and should respond to the client as API
> response.
> 
> So, here this problem has a async pub-sub model and also API handling,
> which is a sync.
> 
> Any suggestions on implementing this highly appreciated.
> 
> thanks
> Rams



RE: Topic deletion

2017-04-09 Thread Adrian McCague
Thank you Mayuresh, that's an excellent write-up.

As you mention about reassignment blocking deletion; I think we are seeing an 
issue with our brokers and reassignment so that could well be the root cause of 
the deletion issue we experienced. The zKclient suggestion that I followed 
before resolved the issue on ZK but the brokers never resolved so we rebuilt 
the system (luckily this is in a staging environment).

Thanks
Adrian

-Original Message-
From: Mayuresh Gharat [mailto:gharatmayures...@gmail.com] 
Sent: 07 April 2017 18:56
To: users@kafka.apache.org
Subject: Re: Topic deletion

Hi Adrian,

When you delete a topic, it is marked a under /admin/delete_topics path in 
zookeeper. Once that is done, controller gets a notification that a topic is 
marked for delete. The controller then notifies the replcias of the 
topic-partitions to delete the data for the topic and the topic itself.
After KAFKA-1911, delete topic is made asynchronous and faster. The replicas on 
receiving the notification from controller will rename the topic-partition 
directory to something like topic-partition.delete and return to controller 
saying delete topic is done. The controller when it receives the ack from all 
the replicas that topic has been deleted, it will delete the znode under 
/admin/delete_topics path. Your delete topic is successful at this stage. The 
renamed directory gets removed asynchronously on those replicas brokers at some 
later point in time.
Also remember, if you topic-partitions are been reassigned, delete topic will 
not be triggered till the reassignment is finished.

We at Linkedin, have been suing Delete topic for a while now and it has been 
working fine after KAFKA-1911.

Hope this helps.

Thanks,

Mayuresh

On Fri, Apr 7, 2017 at 2:15 AM, Adrian McCague 
wrote:

> Indeed, I wish it was explained somewhere what marked for deletion 
> does, and what process handles it. I will have to go and investigate the 
> source.
>
> I can confirm the zkCli did the trick, thanks for the hint!
>
> Adrian
>
> -Original Message-
> From: Akhilesh Pathodia [mailto:pathodia.akhil...@gmail.com]
> Sent: 07 April 2017 09:57
> To: users@kafka.apache.org
> Subject: Re: Topic deletion
>
> I am not sure but kafka delete command does not delete the topic 
> actually, it only marks it for deletion. Probably it is fixed in later 
> version of kafka.
>
> On Fri, Apr 7, 2017 at 2:14 PM, Adrian McCague 
> 
> wrote:
>
> > Hi Akhilesh,
> >
> > Why would this approach need to be taken over the kafka-topics tool, 
> > out of interest?
> >
> > Thanks
> > Adrian
> >
> > -Original Message-
> > From: Akhilesh Pathodia [mailto:pathodia.akhil...@gmail.com]
> > Sent: 07 April 2017 09:37
> > To: users@kafka.apache.org
> > Subject: Re: Topic deletion
> >
> > Hi Adrian,
> >
> > You will have to delete the broker directory from zookeeper. This 
> > can be done  from zookeeper cli. Connect to zookeeper cli using 
> > below
> command:
> >
> > zookeeper-client -server 
> >
> > Then run below command :
> >
> > rmr /brokers/topics/
> >
> > Thanks,
> > AKhilesh
> >
> > On Thu, Apr 6, 2017 at 11:03 PM, Adrian McCague 
> > 
> > wrote:
> >
> > > Hi all,
> > >
> > > I am trying to understand topic deletion in kafka, there appears 
> > > to be very little documentation or answers on how this works. 
> > > Typically they just say to turn on the feature on the broker (in my case 
> > > it is).
> > >
> > > I executed:
> > > Kafka-topics.bat -delete -zookeeper keeperhere -topic mytopic
> > >
> > > Running this again yields:
> > > Topic mytopic is already marked for deletion.
> > >
> > > --describe yields:
> > > Topic: mytopic  PartitionCount:6ReplicationFactor:3
>  Configs:
> > > retention.ms=0
> > > Topic: mytopic  Partition: 0Leader: -1  Replicas:
> > > 1006,1001,1005Isr:
> > > Topic  mytopic  Partition: 1Leader: -1  Replicas:
> > > 1001,1005,1003Isr:
> > >Topic: mytopic  Partition: 2Leader: -1  Replicas:
> > > 1005,1003,1004Isr:
> > > Topic: mytopic  Partition: 3Leader: -1  Replicas:
> > > 1003,1004,1007Isr:
> > > Topic: mytopic  Partition: 4Leader: -1  Replicas:
> > > 1004,1007,1006Isr:
> > > Topic: mytopic  Partition: 5Leader: -1  Replicas:
> > > 1007,1006,1001Isr:
> > >
> > > You can see that the deletion mark has meant that the Leader is -1.
> > > Also I read somewhere that retention needs to be set to something 
> > > low to trigger the deletion, hence the config of retention.ms=0
> > >
> > > Consumers (or streams in my case) no longer see the topic:
> > > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
> > > topology building: stream-thread [StreamThread-1] Topic not found:
> > > mytopic
> > >
> > > And I can't create a new topic in its place:
> > > [2017-04-06 18:26:00,702] ERROR 

Re: In kafka streams consumer seems to hang while retrieving the offsets

2017-04-09 Thread Eno Thereska
Hi Sachin,

It's not necessarily a deadlock. Do you have any debug traces from those nodes? 
Also would be useful to know the config (e.g., how many partitions do you have 
and how many app instances.)

Thanks
Eno

> On 9 Apr 2017, at 04:45, Sachin Mittal  wrote:
> 
> Hi,
> In my streams applications cluster in one or more instances I see some
> threads always waiting with the following stack.
> 
> Every time I check on jstack I see the following trace.
> 
> Is this some kind of new deadlock that we have failed to identify.
> 
> Thanks
> Sachin
> 
> here is the stack trace:
> --
> "StreamThread-4" #20 prio=5 os_prio=0 tid=0x7fb814be3000 nid=0x19bf
> runnable [0x7fb7cb4f6000]
>   java.lang.Thread.State: RUNNABLE
>at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>- locked <0x000701c50c98> (a sun.nio.ch.Util$3)
>- locked <0x000701c50c88> (a java.util.Collections$
> UnmodifiableSet)
>- locked <0x000701c4f6a8> (a sun.nio.ch.EPollSelectorImpl)
>at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>at org.apache.kafka.common.network.Selector.select(
> Selector.java:489)
>at org.apache.kafka.common.network.Selector.poll(Selector.java:298)
>at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:349)
>at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>- locked <0x000701c5da48> (a org.apache.kafka.clients.
> consumer.internals.ConsumerNetworkClient)
>at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
>at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138)
>at org.apache.kafka.clients.consumer.internals.Fetcher.
> retrieveOffsetsByTimes(Fetcher.java:422)
>at org.apache.kafka.clients.consumer.internals.Fetcher.
> resetOffset(Fetcher.java:370)
>at org.apache.kafka.clients.consumer.internals.Fetcher.
> resetOffsetsIfNeeded(Fetcher.java:227)
>at org.apache.kafka.clients.consumer.KafkaConsumer.
> updateFetchPositions(KafkaConsumer.java:1592)
>at org.apache.kafka.clients.consumer.KafkaConsumer.
> position(KafkaConsumer.java:1265)
>at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:213)



In kafka streams consumer seems to hang while retrieving the offsets

2017-04-09 Thread Sachin Mittal
Hi,
In my streams applications cluster in one or more instances I see some
threads always waiting with the following stack.

Every time I check on jstack I see the following trace.

Is this some kind of new deadlock that we have failed to identify.

Thanks
Sachin

here is the stack trace:
--
"StreamThread-4" #20 prio=5 os_prio=0 tid=0x7fb814be3000 nid=0x19bf
runnable [0x7fb7cb4f6000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x000701c50c98> (a sun.nio.ch.Util$3)
- locked <0x000701c50c88> (a java.util.Collections$
UnmodifiableSet)
- locked <0x000701c4f6a8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(
Selector.java:489)
at org.apache.kafka.common.network.Selector.poll(Selector.java:298)
at org.apache.kafka.clients.NetworkClient.poll(
NetworkClient.java:349)
at org.apache.kafka.clients.consumer.internals.
ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
- locked <0x000701c5da48> (a org.apache.kafka.clients.
consumer.internals.ConsumerNetworkClient)
at org.apache.kafka.clients.consumer.internals.
ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
at org.apache.kafka.clients.consumer.internals.
ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138)
at org.apache.kafka.clients.consumer.internals.Fetcher.
retrieveOffsetsByTimes(Fetcher.java:422)
at org.apache.kafka.clients.consumer.internals.Fetcher.
resetOffset(Fetcher.java:370)
at org.apache.kafka.clients.consumer.internals.Fetcher.
resetOffsetsIfNeeded(Fetcher.java:227)
at org.apache.kafka.clients.consumer.KafkaConsumer.
updateFetchPositions(KafkaConsumer.java:1592)
at org.apache.kafka.clients.consumer.KafkaConsumer.
position(KafkaConsumer.java:1265)
at org.apache.kafka.streams.processor.internals.
ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:213)