Some new information.
Set debug true and from active worker log I can see:
if worker is ok:
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack
[7197822474056634252 -608920652033678418]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message
source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252
-608920652033678418]
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker
__ack_ack [7197822474056634252]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message
source: KafkaConsumerSpout:1, stream: default, id:
{4344988213623161794=-5214435544383558411}, my message...
and after worker dies there are only rows about spout like:
2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout
__ack_init [3399515592775976300 5357635772515085965 1]
2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout default
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
On 03/06/14 09:58, Margusja wrote:
Hei
I have made a new test and discovered that in my environment a very
simple bolt will die too after around 2500 cycle.
Bolt's code:
1 package storm;
2
3 import backtype.storm.task.TopologyContext;
4 import backtype.storm.topology.BasicOutputCollector;
5 import backtype.storm.topology.OutputFieldsDeclarer;
6 import backtype.storm.topology.base.BaseBasicBolt;
7 import backtype.storm.tuple.Fields;
8 import backtype.storm.tuple.Tuple;
9 import backtype.storm.tuple.Values;
10
11 import java.util.Map;
12 import java.util.UUID;
13
14 public class DummyBolt extends BaseBasicBolt
15 {
16 int count = 0;
17
18 @Override
19 public void prepare(Map stormConf, TopologyContext
context) {
20 }
21
22 @Override
23 public void execute(Tuple tuple, BasicOutputCollector
collector)
24 {
25 String line = tuple.getString(0);
26
27 count ++;
28 System.out.println("Dummy count: "+ count);
29 collector.emit(new Values(line));
30
31 }
32
33 @Override
34 public void declareOutputFields(OutputFieldsDeclarer
declarer)
35 {
36 declarer.declare(new Fields("line"));
37 }
38
39 @Override
40 public void cleanup()
41 {
42 }
43
44 }
after around 2500 cycles there is no output from execute methods.
What I do after this.
[root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids
[root@dlvm2 sysconfig]# kill -9 4179
after it new worker is coming up and it works again around 2500
cycles and stops and I have to kill pid again.
Any ideas?
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
On 02/06/14 13:36, Margusja wrote:
Hi
I am using apache-storm-0.9.1-incubating.
I have simple topology: Spout reads from kafka topic and Bolt writes
lines from spout to HBase.
recently we did a test - we send 300 000 000 messages over kafka-rest
-> kafka-queue -> storm topology -> hbase.
I noticed that around one hour and around 2500 messages worker died.
PID is there and process is up but bolt's execute method hangs.
Bolts code is:
package storm;
2
3 import java.util.Map;
4 import java.util.UUID;
5
6 import org.apache.hadoop.conf.Configuration;
7 import org.apache.hadoop.hbase.HBaseConfiguration;
8 import org.apache.hadoop.hbase.client.HTableInterface;
9 import org.apache.hadoop.hbase.client.HTablePool;
10 import org.apache.hadoop.hbase.client.Put;
11 import org.apache.hadoop.hbase.util.Bytes;
12
13 import backtype.storm.task.TopologyContext;
14 import backtype.storm.topology.BasicOutputCollector;
15 import backtype.storm.topology.OutputFieldsDeclarer;
16 import backtype.storm.topology.base.BaseBasicBolt;
17 import backtype.storm.tuple.Fields;
18 import backtype.storm.tuple.Tuple;
19 import backtype.storm.tuple.Values;
public class HBaseWriterBolt extends BaseBasicBolt
22 {
23
24 HTableInterface usersTable;
25 HTablePool pool;
26 int count = 0;
27
28 @Override
29 public void prepare(Map stormConf, TopologyContext
context) {
30 Configuration conf = HBaseConfiguration.create();
31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2");
32 conf.set("hbase.defaults.for.version.skip","true");
33 conf.set("hbase.zookeeper.quorum",
"vm24,vm37,vm38");
34 conf.set("hbase.zookeeper.property.clientPort", "2181");
35 conf.set("hbase.rootdir",
"hdfs://vm38:8020/user/hbase/data");
36 //conf.set("zookeeper.znode.parent",
"/hbase-unsecure");
37
38 pool = new HTablePool(conf, 1);
39 usersTable = pool.getTable("kafkademo1");
40 }
41
42 @Override
43 public void execute(Tuple tuple, BasicOutputCollector
collector)
44 {
45 String line = tuple.getString(0);
46
47 Put p = new
Put(Bytes.toBytes(UUID.randomUUID().toString()));
48 p.add(Bytes.toBytes("info"),
Bytes.toBytes("line"), Bytes.toBytes(line));
49
50 try {
51 usersTable.put(p);
52 count ++;
53 System.out.println("Count: "+ count);
54 }
55 catch (Exception e){
56 e.printStackTrace();
57 }
58 collector.emit(new Values(line));
59
60 }
61
62 @Override
63 public void declareOutputFields(OutputFieldsDeclarer
declarer)
64 {
65 declarer.declare(new Fields("line"));
66 }
67
68 @Override
69 public void cleanup()
70 {
71 try {
72 usersTable.close();
73 }
74 catch (Exception e){
75 e.printStackTrace();
76 }
77 }
78
79 }
line in execute method: System.out.println("Count: "+ count); added
in debug purpose to see in log that bolt is running.
In to Spout in method nextTuple()
I added debug line: System.out.println("Message from the Topic ...");
After some time around 50minutes in log file I can see that Spout is
working but Bolt is died.
Any ideas?