Re: Worker dies (bolt)
Ok got more info. Looks like the problem is related with spout. I changed spout: 32 public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) 33 { 34 this.collector = collector; 35 36 Properties props = new Properties(); 37 props.put("zookeeper.connect", "vm24:2181,vm37:2181,vm38:2181"); 38 props.put("group.id", "testgroup"); 39 props.put("zookeeper.session.timeout.ms", "500"); 40 props.put("zookeeper.sync.time.ms", "250"); 41 props.put("auto.commit.interval.ms", "1000"); 42 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); 43 this.topic = "kafkademo1"; 44 45 46 } to 32 public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) 33 { 34 this.collector = collector; 35 36 Properties props = new Properties(); 37 props.put("zookeeper.connect", "vm24:2181,vm37:2181,vm38:2181"); 38 props.put("group.id", "testgroup"); 39 //props.put("zookeeper.session.timeout.ms", "500"); 40 //props.put("zookeeper.sync.time.ms", "250"); 41 //props.put("auto.commit.interval.ms", "1000"); 42 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); 43 this.topic = "kafkademo1"; 44 45 46 } and 48 public void nextTuple() 49 { 50 55 56 Map topicCount = new HashMapInteger>(); 57 // Define single thread for topic 58 topicCount.put(topic, new Integer(1)); 59 Map>> consumerStreams = consumer.createMessageStreams(topicCount); 60 List> streams = consumerStreams.get(topic); 61 for (final KafkaStream stream : streams) { 62 ConsumerIterator consumerIte = stream.iterator(); 63 while (consumerIte.hasNext()) 64 { 65 // System.out.println("Message from the Topic ..."); 66 String line = new String(consumerIte.next().message()); 67 this.collector.emit(new Values(line), line); 69 } 70 71 72 } 73 if (consumer != null) 74 consumer.shutdown(); 75 } to 48 public void nextTuple() 49 { 50 55 56 Map topicCount = new HashMapInteger>(); 57 // Define single thread for topic 58 topicCount.put(topic, new Integer(1)); 59 Map>> consumerStreams = consumer.createMessageStreams(topicCount); 60 List> streams = consumerStreams.get(topic); 61 for (final KafkaStream stream : streams) { 62 ConsumerIterator consumerIte = stream.iterator(); 63 while (consumerIte.hasNext()) 64 { 65 // System.out.println("Message from the Topic ..."); 66 String line = new String(consumerIte.next().message()); 67 //this.collector.emit(new Values(line), line); 68 this.collector.emit(new Values(line)); 69 } 70 71 72 } 73 if (consumer != null) 74 consumer.shutdown(); 75 } And now it is running. Strange because when worker died then I see log rows from spout. But I think it is related somehow with the internal stuff in storm. Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)" On 03/06/14 11:09, Margusja wrote: Some new information. Set debug true and from active worker log I can see: if worker is ok: 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack [7197822474056634252 -608920652033678418] 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252 -608920652033678418] 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker __ack_ack [7197822474056634252] 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message source: KafkaConsumerSpout:1, stream: default, id: {4344988213623161794=-5214435544383558411}, my message... and after worker dies there are only rows about spout like: 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout __ack_init [3399515592775976300 5357635772515085965 1] 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout default Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)" On 03/06/14 09:58, Margusja wrote: He
Re: Worker dies (bolt)
Some new information. Set debug true and from active worker log I can see: if worker is ok: 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack [7197822474056634252 -608920652033678418] 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252 -608920652033678418] 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker __ack_ack [7197822474056634252] 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message source: KafkaConsumerSpout:1, stream: default, id: {4344988213623161794=-5214435544383558411}, my message... and after worker dies there are only rows about spout like: 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout __ack_init [3399515592775976300 5357635772515085965 1] 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout default Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)" On 03/06/14 09:58, Margusja wrote: Hei I have made a new test and discovered that in my environment a very simple bolt will die too after around 2500 cycle. Bolt's code: 1 package storm; 2 3 import backtype.storm.task.TopologyContext; 4 import backtype.storm.topology.BasicOutputCollector; 5 import backtype.storm.topology.OutputFieldsDeclarer; 6 import backtype.storm.topology.base.BaseBasicBolt; 7 import backtype.storm.tuple.Fields; 8 import backtype.storm.tuple.Tuple; 9 import backtype.storm.tuple.Values; 10 11 import java.util.Map; 12 import java.util.UUID; 13 14 public class DummyBolt extends BaseBasicBolt 15 { 16 int count = 0; 17 18 @Override 19 public void prepare(Map stormConf, TopologyContext context) { 20 } 21 22 @Override 23 public void execute(Tuple tuple, BasicOutputCollector collector) 24 { 25 String line = tuple.getString(0); 26 27 count ++; 28 System.out.println("Dummy count: "+ count); 29 collector.emit(new Values(line)); 30 31 } 32 33 @Override 34 public void declareOutputFields(OutputFieldsDeclarer declarer) 35 { 36 declarer.declare(new Fields("line")); 37 } 38 39 @Override 40 public void cleanup() 41 { 42 } 43 44 } after around 2500 cycles there is no output from execute methods. What I do after this. [root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids [root@dlvm2 sysconfig]# kill -9 4179 after it new worker is coming up and it works again around 2500 cycles and stops and I have to kill pid again. Any ideas? Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)" On 02/06/14 13:36, Margusja wrote: Hi I am using apache-storm-0.9.1-incubating. I have simple topology: Spout reads from kafka topic and Bolt writes lines from spout to HBase. recently we did a test - we send 300 000 000 messages over kafka-rest -> kafka-queue -> storm topology -> hbase. I noticed that around one hour and around 2500 messages worker died. PID is there and process is up but bolt's execute method hangs. Bolts code is: package storm; 2 3 import java.util.Map; 4 import java.util.UUID; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.HBaseConfiguration; 8 import org.apache.hadoop.hbase.client.HTableInterface; 9 import org.apache.hadoop.hbase.client.HTablePool; 10 import org.apache.hadoop.hbase.client.Put; 11 import org.apache.hadoop.hbase.util.Bytes; 12 13 import backtype.storm.task.TopologyContext; 14 import backtype.storm.topology.BasicOutputCollector; 15 import backtype.storm.topology.OutputFieldsDeclarer; 16 import backtype.storm.topology.base.BaseBasicBolt; 17 import backtype.storm.tuple.Fields; 18 import backtype.storm.tuple.Tuple; 19 import backtype.storm.tuple.Values; public class HBaseWriterBolt extends BaseBasicBolt 22 { 23 24 HTableInterface usersTable; 25 HTablePool pool; 26 int count = 0; 27 28 @Override 29 public void prepare(Map stormConf, TopologyContext context) { 30 Configuration conf = HBaseConfiguration.create(); 31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2"); 32 conf.set("hbase.defaults.for.version.skip","true"); 33 conf.set("hbase.zookeeper.quorum", "vm24,vm37,vm38"); 34 conf.set("hbase.zookeeper.property.clientPort", "2181"); 35 conf.set("hbase.rootdir", "hdfs://vm38:8020/user/hbase/data"); 36 //conf.set("zookeeper.znode.parent", "/hbase-unsecure"); 37 38 pool = new HTablePoo
Re: Worker dies (bolt)
Hei I have made a new test and discovered that in my environment a very simple bolt will die too after around 2500 cycle. Bolt's code: 1 package storm; 2 3 import backtype.storm.task.TopologyContext; 4 import backtype.storm.topology.BasicOutputCollector; 5 import backtype.storm.topology.OutputFieldsDeclarer; 6 import backtype.storm.topology.base.BaseBasicBolt; 7 import backtype.storm.tuple.Fields; 8 import backtype.storm.tuple.Tuple; 9 import backtype.storm.tuple.Values; 10 11 import java.util.Map; 12 import java.util.UUID; 13 14 public class DummyBolt extends BaseBasicBolt 15 { 16 int count = 0; 17 18 @Override 19 public void prepare(Map stormConf, TopologyContext context) { 20 } 21 22 @Override 23 public void execute(Tuple tuple, BasicOutputCollector collector) 24 { 25 String line = tuple.getString(0); 26 27 count ++; 28 System.out.println("Dummy count: "+ count); 29 collector.emit(new Values(line)); 30 31 } 32 33 @Override 34 public void declareOutputFields(OutputFieldsDeclarer declarer) 35 { 36 declarer.declare(new Fields("line")); 37 } 38 39 @Override 40 public void cleanup() 41 { 42 } 43 44 } after around 2500 cycles there is no output from execute methods. What I do after this. [root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids [root@dlvm2 sysconfig]# kill -9 4179 after it new worker is coming up and it works again around 2500 cycles and stops and I have to kill pid again. Any ideas? Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)" On 02/06/14 13:36, Margusja wrote: Hi I am using apache-storm-0.9.1-incubating. I have simple topology: Spout reads from kafka topic and Bolt writes lines from spout to HBase. recently we did a test - we send 300 000 000 messages over kafka-rest -> kafka-queue -> storm topology -> hbase. I noticed that around one hour and around 2500 messages worker died. PID is there and process is up but bolt's execute method hangs. Bolts code is: package storm; 2 3 import java.util.Map; 4 import java.util.UUID; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.HBaseConfiguration; 8 import org.apache.hadoop.hbase.client.HTableInterface; 9 import org.apache.hadoop.hbase.client.HTablePool; 10 import org.apache.hadoop.hbase.client.Put; 11 import org.apache.hadoop.hbase.util.Bytes; 12 13 import backtype.storm.task.TopologyContext; 14 import backtype.storm.topology.BasicOutputCollector; 15 import backtype.storm.topology.OutputFieldsDeclarer; 16 import backtype.storm.topology.base.BaseBasicBolt; 17 import backtype.storm.tuple.Fields; 18 import backtype.storm.tuple.Tuple; 19 import backtype.storm.tuple.Values; public class HBaseWriterBolt extends BaseBasicBolt 22 { 23 24 HTableInterface usersTable; 25 HTablePool pool; 26 int count = 0; 27 28 @Override 29 public void prepare(Map stormConf, TopologyContext context) { 30 Configuration conf = HBaseConfiguration.create(); 31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2"); 32 conf.set("hbase.defaults.for.version.skip","true"); 33 conf.set("hbase.zookeeper.quorum", "vm24,vm37,vm38"); 34 conf.set("hbase.zookeeper.property.clientPort", "2181"); 35 conf.set("hbase.rootdir", "hdfs://vm38:8020/user/hbase/data"); 36 //conf.set("zookeeper.znode.parent", "/hbase-unsecure"); 37 38 pool = new HTablePool(conf, 1); 39 usersTable = pool.getTable("kafkademo1"); 40 } 41 42 @Override 43 public void execute(Tuple tuple, BasicOutputCollector collector) 44 { 45 String line = tuple.getString(0); 46 47 Put p = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 48 p.add(Bytes.toBytes("info"), Bytes.toBytes("line"), Bytes.toBytes(line)); 49 50 try { 51 usersTable.put(p); 52 count ++; 53 System.out.println("Count: "+ count); 54 } 55 catch (Exception e){ 56 e.printStackTrace(); 57 } 58 collector.emit(new Values(line)); 59 60 } 61 62 @Override 63 public void declareOutputFields(OutputFieldsDeclarer declarer) 64 { 65 declarer.declare(new Fields("line")); 66 } 67 68 @Override 69 public void cleanup() 70 { 71 try { 72
Worker dies (bolt)
Hi I am using apache-storm-0.9.1-incubating. I have simple topology: Spout reads from kafka topic and Bolt writes lines from spout to HBase. recently we did a test - we send 300 000 000 messages over kafka-rest -> kafka-queue -> storm topology -> hbase. I noticed that around one hour and around 2500 messages worker died. PID is there and process is up but bolt's execute method hangs. Bolts code is: package storm; 2 3 import java.util.Map; 4 import java.util.UUID; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.HBaseConfiguration; 8 import org.apache.hadoop.hbase.client.HTableInterface; 9 import org.apache.hadoop.hbase.client.HTablePool; 10 import org.apache.hadoop.hbase.client.Put; 11 import org.apache.hadoop.hbase.util.Bytes; 12 13 import backtype.storm.task.TopologyContext; 14 import backtype.storm.topology.BasicOutputCollector; 15 import backtype.storm.topology.OutputFieldsDeclarer; 16 import backtype.storm.topology.base.BaseBasicBolt; 17 import backtype.storm.tuple.Fields; 18 import backtype.storm.tuple.Tuple; 19 import backtype.storm.tuple.Values; public class HBaseWriterBolt extends BaseBasicBolt 22 { 23 24 HTableInterface usersTable; 25 HTablePool pool; 26 int count = 0; 27 28 @Override 29 public void prepare(Map stormConf, TopologyContext context) { 30 Configuration conf = HBaseConfiguration.create(); 31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2"); 32 conf.set("hbase.defaults.for.version.skip","true"); 33 conf.set("hbase.zookeeper.quorum", "vm24,vm37,vm38"); 34 conf.set("hbase.zookeeper.property.clientPort", "2181"); 35 conf.set("hbase.rootdir", "hdfs://vm38:8020/user/hbase/data"); 36 //conf.set("zookeeper.znode.parent", "/hbase-unsecure"); 37 38 pool = new HTablePool(conf, 1); 39 usersTable = pool.getTable("kafkademo1"); 40 } 41 42 @Override 43 public void execute(Tuple tuple, BasicOutputCollector collector) 44 { 45 String line = tuple.getString(0); 46 47 Put p = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 48 p.add(Bytes.toBytes("info"), Bytes.toBytes("line"), Bytes.toBytes(line)); 49 50 try { 51 usersTable.put(p); 52 count ++; 53 System.out.println("Count: "+ count); 54 } 55 catch (Exception e){ 56 e.printStackTrace(); 57 } 58 collector.emit(new Values(line)); 59 60 } 61 62 @Override 63 public void declareOutputFields(OutputFieldsDeclarer declarer) 64 { 65 declarer.declare(new Fields("line")); 66 } 67 68 @Override 69 public void cleanup() 70 { 71 try { 72 usersTable.close(); 73 } 74 catch (Exception e){ 75 e.printStackTrace(); 76 } 77 } 78 79 } line in execute method: System.out.println("Count: "+ count); added in debug purpose to see in log that bolt is running. In to Spout in method nextTuple() I added debug line: System.out.println("Message from the Topic ..."); After some time around 50minutes in log file I can see that Spout is working but Bolt is died. Any ideas? -- Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"