Hi
I am using apache-storm-0.9.1-incubating.
I have simple topology: Spout reads from kafka topic and Bolt writes
lines from spout to HBase.
recently we did a test - we send 300 000 000 messages over kafka-rest ->
kafka-queue -> storm topology -> hbase.
I noticed that around one hour and around 2500 messages worker died. PID
is there and process is up but bolt's execute method hangs.
Bolts code is:
package storm;
2
3 import java.util.Map;
4 import java.util.UUID;
5
6 import org.apache.hadoop.conf.Configuration;
7 import org.apache.hadoop.hbase.HBaseConfiguration;
8 import org.apache.hadoop.hbase.client.HTableInterface;
9 import org.apache.hadoop.hbase.client.HTablePool;
10 import org.apache.hadoop.hbase.client.Put;
11 import org.apache.hadoop.hbase.util.Bytes;
12
13 import backtype.storm.task.TopologyContext;
14 import backtype.storm.topology.BasicOutputCollector;
15 import backtype.storm.topology.OutputFieldsDeclarer;
16 import backtype.storm.topology.base.BaseBasicBolt;
17 import backtype.storm.tuple.Fields;
18 import backtype.storm.tuple.Tuple;
19 import backtype.storm.tuple.Values;
public class HBaseWriterBolt extends BaseBasicBolt
22 {
23
24 HTableInterface usersTable;
25 HTablePool pool;
26 int count = 0;
27
28 @Override
29 public void prepare(Map stormConf, TopologyContext context) {
30 Configuration conf = HBaseConfiguration.create();
31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2");
32 conf.set("hbase.defaults.for.version.skip","true");
33 conf.set("hbase.zookeeper.quorum", "vm24,vm37,vm38");
34 conf.set("hbase.zookeeper.property.clientPort",
"2181");
35 conf.set("hbase.rootdir",
"hdfs://vm38:8020/user/hbase/data");
36 //conf.set("zookeeper.znode.parent",
"/hbase-unsecure");
37
38 pool = new HTablePool(conf, 1);
39 usersTable = pool.getTable("kafkademo1");
40 }
41
42 @Override
43 public void execute(Tuple tuple, BasicOutputCollector
collector)
44 {
45 String line = tuple.getString(0);
46
47 Put p = new
Put(Bytes.toBytes(UUID.randomUUID().toString()));
48 p.add(Bytes.toBytes("info"), Bytes.toBytes("line"),
Bytes.toBytes(line));
49
50 try {
51 usersTable.put(p);
52 count ++;
53 System.out.println("Count: "+ count);
54 }
55 catch (Exception e){
56 e.printStackTrace();
57 }
58 collector.emit(new Values(line));
59
60 }
61
62 @Override
63 public void declareOutputFields(OutputFieldsDeclarer declarer)
64 {
65 declarer.declare(new Fields("line"));
66 }
67
68 @Override
69 public void cleanup()
70 {
71 try {
72 usersTable.close();
73 }
74 catch (Exception e){
75 e.printStackTrace();
76 }
77 }
78
79 }
line in execute method: System.out.println("Count: "+ count); added in
debug purpose to see in log that bolt is running.
In to Spout in method nextTuple()
I added debug line: System.out.println("Message from the Topic ...");
After some time around 50minutes in log file I can see that Spout is
working but Bolt is died.
Any ideas?
--
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"