[ https://issues.apache.org/jira/browse/STORM-963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14648578#comment-14648578 ]
Jungtaek Lim commented on STORM-963: ------------------------------------ Could you reproduce this issue every time? When circumstance appears again, could you dump each worker processes including subprocess? (Dumping worker process can be done with jstack <PID>, dumping python process can be done with kill -SIGABRT <PID>.) You may can't get multilang subprocess's dump since it is redirected to ShellBolt worker's stdin. Dump log could help us. > Frozen topology (KafkaSpout + Multilang bolt) > --------------------------------------------- > > Key: STORM-963 > URL: https://issues.apache.org/jira/browse/STORM-963 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka > Affects Versions: 0.9.4, 0.9.5, 0.9.6 > Environment: - VMware ESX 5.5 > - Ubuntu Server 14.04 LTS (kernel 3.16.0-41-generic) > - Java (TM) SE Runtime Environment (build 1.8.0_45-b14) > - Python 2.7.6 (default, Jun 22 2015, 17:58:13) > - Zookeeper 3.4.6 > Reporter: Alex Sobrino > Labels: multilang > > Hi, > We've got a pretty simple topology running with Storm 0.9.5 (tried also with > 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster: > {code}kafkaSpout (3) -----> processBolt (12){code} > Some info: > - kafkaSpout reads from a topic with 3 partitions and 2 replications > - processBolt iterates throught the message and saves the results in MongoDB > - processBolt is implemented in Python and has a storm.log("I'm doing > something") just to add a simple debug message in the logs > - The messages can be quite big (~25-40 MB) and are in JSON format > - The kafka topic has a retention of 2 hours > - We use the same ZooKeeper cluster to both Kafka and Storm > The topology gets frozen after several hours (not days) running. We don't see > any message in the logs... In fact, the periodic message from s.k.KafkaUtils > and s.k.ZkCoordinator disapears. As you can imagine, the message from the > Bolt also dissapears. Logs are copy/pasted further on. If we redeploy the > topology everything starts to work again until it becomes frozen again. > Our kafkaSpout config is: > {code} > ZkHosts zkHosts = new ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181"); > SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic", > "/topic/ourclientid", "ourclientid"); > kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > kafkaConfig.fetchSizeBytes = 50*1024*1024; > kafkaConfig.bufferSizeBytes = 50*1024*1024; > {code} > We've also tried setting the following options > {code} > kafkaConfig.forceFromStart = true; > kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); // Also > with kafka.api.OffsetRequest.LatestTime(); > kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true; > {code} > Right now the topology is running without acking the messages since there's a > bug in kafkaSpout with failed messages and deleted offsets in Kafka. > This is what can be seen in the logs in one of the workers: > {code} > 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, > name:processBolt I'm doing something > 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, > name:processBolt I'm doing something > 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, > name:processBolt I'm doing something > 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, > name:processBolt I'm doing something > 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing > partition manager connections > 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read partition > info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, > 1=kafka2:9092, 2=kafka3:9092}} > 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned > [Partition{host=kafka2, partition=1}] > 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted > partition managers: [] > 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New > partition managers: [] > 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished > refreshing > 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, > name:processBolt I'm doing something > 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, > name:processBolt I'm doing something > 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing > partition manager connections > 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read partition > info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, > 1=kafka2:9092, 2=kafka3:9092}} > 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned > [Partition{host=kafka2:9092, partition=1}] > 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted > partition managers: [] > 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New > partition managers: [] > 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished > refreshing > 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing > partition manager connections > 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read partition > info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, > 1=kafka2:9092, 2=kafka3:9092}} > 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned > [Partition{host=kafka2:9092, partition=1}] > 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted > partition managers: [] > 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New > partition managers: [] > 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished > refreshing > 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing > partition manager connections > 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read partition > info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, > 1=kafka2:9092, 2=kafka3:9092}} > 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned > [Partition{host=kafka2:9092, partition=1}] > 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted > partition managers: [] > 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New > partition managers: [] > 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished > refreshing > 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing > partition manager connections > 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read partition > info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, > 1=kafka2:9092, 2=kafka3:9092}} > 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned > [Partition{host=kafka2:9092, partition=1}] > 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted > partition managers: [] > 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New > partition managers: [] > 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished > refreshing > {code} > and then it becomes frozen. Nothing is written into the nimbus log. We've > checked the offsets in ZooKeeper and they're not updated: > {code} > {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"} > cZxid = 0x100028958 > ctime = Wed Jul 01 12:22:36 CEST 2015 > mZxid = 0x100518527 > mtime = Thu Jul 23 12:42:41 CEST 2015 > pZxid = 0x100028958 > cversion = 0 > dataVersion = 446913 > aclVersion = 0 > ephemeralOwner = 0x0 > dataLength = 183 > numChildren = 0 > {code} > Any ideas of what we could be missing? > PS: This was sent to the Storm user's mailing list and got 0 replies :\ -- This message was sent by Atlassian JIRA (v6.3.4#6332)