Hei

I have made a new test and discovered that in my environment a very simple bolt will die too after around 2500 cycle.

Bolt's code:

  1 package storm;
  2
  3 import backtype.storm.task.TopologyContext;
  4 import backtype.storm.topology.BasicOutputCollector;
  5 import backtype.storm.topology.OutputFieldsDeclarer;
  6 import backtype.storm.topology.base.BaseBasicBolt;
  7 import backtype.storm.tuple.Fields;
  8 import backtype.storm.tuple.Tuple;
  9 import backtype.storm.tuple.Values;
 10
 11 import java.util.Map;
 12 import java.util.UUID;
 13
 14 public class DummyBolt extends BaseBasicBolt
 15 {
 16     int count = 0;
 17
 18          @Override
 19          public void prepare(Map stormConf, TopologyContext context) {
 20          }
 21
 22         @Override
23 public void execute(Tuple tuple, BasicOutputCollector collector)
 24         {
 25                 String line  = tuple.getString(0);
 26
 27                 count ++;
 28                 System.out.println("Dummy count: "+ count);
 29                 collector.emit(new Values(line));
 30
 31         }
 32
 33         @Override
 34         public void declareOutputFields(OutputFieldsDeclarer declarer)
 35         {
 36                 declarer.declare(new Fields("line"));
 37         }
 38
 39         @Override
 40         public void cleanup()
 41         {
 42         }
 43
 44 }

after around 2500 cycles there is no output from execute methods.
What I do after this.
[root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids
[root@dlvm2 sysconfig]# kill -9 4179
after it new worker is coming up and it works again around 2500 cycles and stops and I have to kill pid again.

Any ideas?

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"

On 02/06/14 13:36, Margusja wrote:
Hi

I am using apache-storm-0.9.1-incubating.
I have simple topology: Spout reads from kafka topic and Bolt writes lines from spout to HBase.

recently we did a test - we send 300 000 000 messages over kafka-rest -> kafka-queue -> storm topology -> hbase.

I noticed that around one hour and around 2500 messages worker died. PID is there and process is up but bolt's execute method hangs.

Bolts code is:
package storm;
  2
  3 import java.util.Map;
  4 import java.util.UUID;
  5
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.hbase.HBaseConfiguration;
  8 import org.apache.hadoop.hbase.client.HTableInterface;
  9 import org.apache.hadoop.hbase.client.HTablePool;
 10 import org.apache.hadoop.hbase.client.Put;
 11 import org.apache.hadoop.hbase.util.Bytes;
 12
 13 import backtype.storm.task.TopologyContext;
 14 import backtype.storm.topology.BasicOutputCollector;
 15 import backtype.storm.topology.OutputFieldsDeclarer;
 16 import backtype.storm.topology.base.BaseBasicBolt;
 17 import backtype.storm.tuple.Fields;
 18 import backtype.storm.tuple.Tuple;
 19 import backtype.storm.tuple.Values;

public class HBaseWriterBolt extends BaseBasicBolt
 22 {
 23
 24     HTableInterface usersTable;
 25     HTablePool pool;
 26     int count = 0;
 27
 28          @Override
29 public void prepare(Map stormConf, TopologyContext context) {
 30                  Configuration conf = HBaseConfiguration.create();
 31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2");
 32 conf.set("hbase.defaults.for.version.skip","true");
33 conf.set("hbase.zookeeper.quorum", "vm24,vm37,vm38");
 34 conf.set("hbase.zookeeper.property.clientPort", "2181");
35 conf.set("hbase.rootdir", "hdfs://vm38:8020/user/hbase/data"); 36 //conf.set("zookeeper.znode.parent", "/hbase-unsecure");
 37
 38                  pool = new HTablePool(conf, 1);
 39                  usersTable = pool.getTable("kafkademo1");
 40          }
41
 42         @Override
43 public void execute(Tuple tuple, BasicOutputCollector collector)
 44         {
 45                 String line  = tuple.getString(0);
 46
47 Put p = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 48 p.add(Bytes.toBytes("info"), Bytes.toBytes("line"), Bytes.toBytes(line));
 49
 50                 try {
 51                         usersTable.put(p);
 52                         count ++;
 53                         System.out.println("Count: "+ count);
 54                 }
 55                 catch (Exception e){
 56                         e.printStackTrace();
 57                 }
 58                 collector.emit(new Values(line));
 59
 60         }
 61
 62         @Override
63 public void declareOutputFields(OutputFieldsDeclarer declarer)
 64         {
 65                 declarer.declare(new Fields("line"));
 66         }
 67
 68         @Override
 69         public void cleanup()
 70         {
 71                 try {
 72                         usersTable.close();
 73                 }
 74                 catch (Exception e){
 75                         e.printStackTrace();
 76                 }
 77         }
 78
 79 }

line in execute method: System.out.println("Count: "+ count); added in debug purpose to see in log that bolt is running.

In to Spout in method  nextTuple()
I added debug line: System.out.println("Message from the Topic ...");

After some time around 50minutes in log file I can see that Spout is working but Bolt is died.

Any ideas?


Reply via email to