Worker dies (bolt)

2014-06-02 Thread Margusja

Hi

I am using apache-storm-0.9.1-incubating.
I have simple topology: Spout reads from kafka topic and Bolt writes 
lines from spout to HBase.


recently we did a test - we send 300 000 000 messages over kafka-rest -> 
kafka-queue -> storm topology -> hbase.


I noticed that around one hour and around 2500 messages worker died. PID 
is there and process is up but bolt's execute method hangs.


Bolts code is:
package storm;
  2
  3 import java.util.Map;
  4 import java.util.UUID;
  5
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.hbase.HBaseConfiguration;
  8 import org.apache.hadoop.hbase.client.HTableInterface;
  9 import org.apache.hadoop.hbase.client.HTablePool;
 10 import org.apache.hadoop.hbase.client.Put;
 11 import org.apache.hadoop.hbase.util.Bytes;
 12
 13 import backtype.storm.task.TopologyContext;
 14 import backtype.storm.topology.BasicOutputCollector;
 15 import backtype.storm.topology.OutputFieldsDeclarer;
 16 import backtype.storm.topology.base.BaseBasicBolt;
 17 import backtype.storm.tuple.Fields;
 18 import backtype.storm.tuple.Tuple;
 19 import backtype.storm.tuple.Values;

public class HBaseWriterBolt extends BaseBasicBolt
 22 {
 23
 24 HTableInterface usersTable;
 25 HTablePool pool;
 26 int count = 0;
 27
 28  @Override
 29  public void prepare(Map stormConf, TopologyContext context) {
 30  Configuration conf = HBaseConfiguration.create();
 31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2");
 32 conf.set("hbase.defaults.for.version.skip","true");
 33  conf.set("hbase.zookeeper.quorum", "vm24,vm37,vm38");
 34  conf.set("hbase.zookeeper.property.clientPort", 
"2181");
 35  conf.set("hbase.rootdir", 
"hdfs://vm38:8020/user/hbase/data");
 36  //conf.set("zookeeper.znode.parent", 
"/hbase-unsecure");

 37
 38  pool = new HTablePool(conf, 1);
 39  usersTable = pool.getTable("kafkademo1");
 40  }
41
 42 @Override
 43 public void execute(Tuple tuple, BasicOutputCollector 
collector)

 44 {
 45 String line  = tuple.getString(0);
 46
 47 Put p = new 
Put(Bytes.toBytes(UUID.randomUUID().toString()));
 48 p.add(Bytes.toBytes("info"), Bytes.toBytes("line"), 
Bytes.toBytes(line));

 49
 50 try {
 51 usersTable.put(p);
 52 count ++;
 53 System.out.println("Count: "+ count);
 54 }
 55 catch (Exception e){
 56 e.printStackTrace();
 57 }
 58 collector.emit(new Values(line));
 59
 60 }
 61
 62 @Override
 63 public void declareOutputFields(OutputFieldsDeclarer declarer)
 64 {
 65 declarer.declare(new Fields("line"));
 66 }
 67
 68 @Override
 69 public void cleanup()
 70 {
 71 try {
 72 usersTable.close();
 73 }
 74 catch (Exception e){
 75 e.printStackTrace();
 76 }
 77 }
 78
 79 }

line in execute method: System.out.println("Count: "+ count); added in 
debug purpose to see in log that bolt is running.


In to Spout in method  nextTuple()
I added debug line: System.out.println("Message from the Topic ...");

After some time around 50minutes in log file I can see that Spout is 
working but Bolt is died.


Any ideas?

--
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"



Re: Worker dies (bolt)

2014-06-02 Thread Margusja

Hei

I have made a new test and discovered that in my environment a very 
simple bolt will die too after around 2500 cycle.


Bolt's code:

  1 package storm;
  2
  3 import backtype.storm.task.TopologyContext;
  4 import backtype.storm.topology.BasicOutputCollector;
  5 import backtype.storm.topology.OutputFieldsDeclarer;
  6 import backtype.storm.topology.base.BaseBasicBolt;
  7 import backtype.storm.tuple.Fields;
  8 import backtype.storm.tuple.Tuple;
  9 import backtype.storm.tuple.Values;
 10
 11 import java.util.Map;
 12 import java.util.UUID;
 13
 14 public class DummyBolt extends BaseBasicBolt
 15 {
 16 int count = 0;
 17
 18  @Override
 19  public void prepare(Map stormConf, TopologyContext context) {
 20  }
 21
 22 @Override
 23 public void execute(Tuple tuple, BasicOutputCollector 
collector)

 24 {
 25 String line  = tuple.getString(0);
 26
 27 count ++;
 28 System.out.println("Dummy count: "+ count);
 29 collector.emit(new Values(line));
 30
 31 }
 32
 33 @Override
 34 public void declareOutputFields(OutputFieldsDeclarer declarer)
 35 {
 36 declarer.declare(new Fields("line"));
 37 }
 38
 39 @Override
 40 public void cleanup()
 41 {
 42 }
 43
 44 }

after around 2500 cycles there is no output from execute methods.
What I do after this.
[root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids
[root@dlvm2 sysconfig]# kill -9 4179
after it  new worker is coming up and it works again around 2500 cycles 
and stops and I have to kill pid again.


Any ideas?

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"

On 02/06/14 13:36, Margusja wrote:

Hi

I am using apache-storm-0.9.1-incubating.
I have simple topology: Spout reads from kafka topic and Bolt writes 
lines from spout to HBase.


recently we did a test - we send 300 000 000 messages over kafka-rest 
-> kafka-queue -> storm topology -> hbase.


I noticed that around one hour and around 2500 messages worker died. 
PID is there and process is up but bolt's execute method hangs.


Bolts code is:
package storm;
  2
  3 import java.util.Map;
  4 import java.util.UUID;
  5
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.hbase.HBaseConfiguration;
  8 import org.apache.hadoop.hbase.client.HTableInterface;
  9 import org.apache.hadoop.hbase.client.HTablePool;
 10 import org.apache.hadoop.hbase.client.Put;
 11 import org.apache.hadoop.hbase.util.Bytes;
 12
 13 import backtype.storm.task.TopologyContext;
 14 import backtype.storm.topology.BasicOutputCollector;
 15 import backtype.storm.topology.OutputFieldsDeclarer;
 16 import backtype.storm.topology.base.BaseBasicBolt;
 17 import backtype.storm.tuple.Fields;
 18 import backtype.storm.tuple.Tuple;
 19 import backtype.storm.tuple.Values;

public class HBaseWriterBolt extends BaseBasicBolt
 22 {
 23
 24 HTableInterface usersTable;
 25 HTablePool pool;
 26 int count = 0;
 27
 28  @Override
 29  public void prepare(Map stormConf, TopologyContext 
context) {

 30  Configuration conf = HBaseConfiguration.create();
 31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2");
 32 conf.set("hbase.defaults.for.version.skip","true");
 33  conf.set("hbase.zookeeper.quorum", 
"vm24,vm37,vm38");

 34 conf.set("hbase.zookeeper.property.clientPort", "2181");
 35  conf.set("hbase.rootdir", 
"hdfs://vm38:8020/user/hbase/data");
 36  //conf.set("zookeeper.znode.parent", 
"/hbase-unsecure");

 37
 38  pool = new HTablePool(conf, 1);
 39  usersTable = pool.getTable("kafkademo1");
 40  }
41
 42 @Override
 43 public void execute(Tuple tuple, BasicOutputCollector 
collector)

 44 {
 45 String line  = tuple.getString(0);
 46
 47 Put p = new 
Put(Bytes.toBytes(UUID.randomUUID().toString()));
 48 p.add(Bytes.toBytes("info"), 
Bytes.toBytes("line"), Bytes.toBytes(line));

 49
 50 try {
 51 usersTable.put(p);
 52 count ++;
 53 System.out.println("Count: "+ count);
 54 }
 55 catch (Exception e){
 56 e.printStackTrace();
 57 }
 58 collector.emit(new Values(line));
 59
 60 }
 61
 62 @Override
 63 public void declareOutputFields(OutputFieldsD

Re: Worker dies (bolt)

2014-06-03 Thread Margusja

Some new information.
Set debug true and from active worker log I can see:
if worker is ok:
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack 
[7197822474056634252 -608920652033678418]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252 
-608920652033678418]
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker 
__ack_ack [7197822474056634252]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
source: KafkaConsumerSpout:1, stream: default, id: 
{4344988213623161794=-5214435544383558411},  my message...


and after worker dies there are only rows about spout like:
2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout 
__ack_init [3399515592775976300 5357635772515085965 1]

2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout default

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"

On 03/06/14 09:58, Margusja wrote:

Hei

I have made a new test and discovered that in my environment a very 
simple bolt will die too after around 2500 cycle.


Bolt's code:

  1 package storm;
  2
  3 import backtype.storm.task.TopologyContext;
  4 import backtype.storm.topology.BasicOutputCollector;
  5 import backtype.storm.topology.OutputFieldsDeclarer;
  6 import backtype.storm.topology.base.BaseBasicBolt;
  7 import backtype.storm.tuple.Fields;
  8 import backtype.storm.tuple.Tuple;
  9 import backtype.storm.tuple.Values;
 10
 11 import java.util.Map;
 12 import java.util.UUID;
 13
 14 public class DummyBolt extends BaseBasicBolt
 15 {
 16 int count = 0;
 17
 18  @Override
 19  public void prepare(Map stormConf, TopologyContext 
context) {

 20  }
 21
 22 @Override
 23 public void execute(Tuple tuple, BasicOutputCollector 
collector)

 24 {
 25 String line  = tuple.getString(0);
 26
 27 count ++;
 28 System.out.println("Dummy count: "+ count);
 29 collector.emit(new Values(line));
 30
 31 }
 32
 33 @Override
 34 public void declareOutputFields(OutputFieldsDeclarer 
declarer)

 35 {
 36 declarer.declare(new Fields("line"));
 37 }
 38
 39 @Override
 40 public void cleanup()
 41 {
 42 }
 43
 44 }

after around 2500 cycles there is no output from execute methods.
What I do after this.
[root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids
[root@dlvm2 sysconfig]# kill -9 4179
after it  new worker is coming up and it works again around 2500 
cycles and stops and I have to kill pid again.


Any ideas?

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"

On 02/06/14 13:36, Margusja wrote:

Hi

I am using apache-storm-0.9.1-incubating.
I have simple topology: Spout reads from kafka topic and Bolt writes 
lines from spout to HBase.


recently we did a test - we send 300 000 000 messages over kafka-rest 
-> kafka-queue -> storm topology -> hbase.


I noticed that around one hour and around 2500 messages worker died. 
PID is there and process is up but bolt's execute method hangs.


Bolts code is:
package storm;
  2
  3 import java.util.Map;
  4 import java.util.UUID;
  5
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.hbase.HBaseConfiguration;
  8 import org.apache.hadoop.hbase.client.HTableInterface;
  9 import org.apache.hadoop.hbase.client.HTablePool;
 10 import org.apache.hadoop.hbase.client.Put;
 11 import org.apache.hadoop.hbase.util.Bytes;
 12
 13 import backtype.storm.task.TopologyContext;
 14 import backtype.storm.topology.BasicOutputCollector;
 15 import backtype.storm.topology.OutputFieldsDeclarer;
 16 import backtype.storm.topology.base.BaseBasicBolt;
 17 import backtype.storm.tuple.Fields;
 18 import backtype.storm.tuple.Tuple;
 19 import backtype.storm.tuple.Values;

public class HBaseWriterBolt extends BaseBasicBolt
 22 {
 23
 24 HTableInterface usersTable;
 25 HTablePool pool;
 26 int count = 0;
 27
 28  @Override
 29  public void prepare(Map stormConf, TopologyContext 
context) {

 30  Configuration conf = HBaseConfiguration.create();
 31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2");
 32 conf.set("hbase.defaults.for.version.skip","true");
 33  conf.set("hbase.zookeeper.quorum", 
"vm24,vm37,vm38");

 34 conf.set("hbase.zookeeper.property.clientPort", "2181");
 35  conf.set("hbase.rootdir", 
&qu

Re: Worker dies (bolt)

2014-06-03 Thread Margusja

Ok got more info.
Looks like the problem is related with spout.

I changed spout:

 32 public void open(Map conf, TopologyContext 
context,SpoutOutputCollector collector)

 33 {
 34 this.collector = collector;
 35
 36 Properties props = new Properties();
 37 props.put("zookeeper.connect", 
"vm24:2181,vm37:2181,vm38:2181");

 38 props.put("group.id", "testgroup");
 39 props.put("zookeeper.session.timeout.ms", "500");
 40 props.put("zookeeper.sync.time.ms", "250");
 41 props.put("auto.commit.interval.ms", "1000");
 42 consumer = Consumer.createJavaConsumerConnector(new 
ConsumerConfig(props));

 43 this.topic = "kafkademo1";
 44
 45
 46 }

to

 32 public void open(Map conf, TopologyContext 
context,SpoutOutputCollector collector)

 33 {
 34 this.collector = collector;
 35
 36 Properties props = new Properties();
 37 props.put("zookeeper.connect", 
"vm24:2181,vm37:2181,vm38:2181");

 38 props.put("group.id", "testgroup");
 39 //props.put("zookeeper.session.timeout.ms", "500");
 40 //props.put("zookeeper.sync.time.ms", "250");
 41 //props.put("auto.commit.interval.ms", "1000");
 42 consumer = Consumer.createJavaConsumerConnector(new 
ConsumerConfig(props));

 43 this.topic = "kafkademo1";
 44
 45
 46 }

and
 48 public void nextTuple()
 49 {
 50
 55
 56 Map topicCount = new HashMapInteger>();

 57 // Define single thread for topic
 58 topicCount.put(topic, new Integer(1));
 59 Map>> 
consumerStreams = consumer.createMessageStreams(topicCount);
 60 List> streams = 
consumerStreams.get(topic);

 61 for (final KafkaStream stream : streams) {
 62   ConsumerIterator consumerIte = 
stream.iterator();

 63   while (consumerIte.hasNext())
 64   {
 65  // System.out.println("Message from the Topic ...");
 66   String line = new 
String(consumerIte.next().message());

 67   this.collector.emit(new Values(line), line);
 69   }
 70
 71
 72 }
 73 if (consumer != null)
 74   consumer.shutdown();
 75 }

to

 48 public void nextTuple()
 49 {
 50
 55
 56 Map topicCount = new HashMapInteger>();

 57 // Define single thread for topic
 58 topicCount.put(topic, new Integer(1));
 59 Map>> 
consumerStreams = consumer.createMessageStreams(topicCount);
 60 List> streams = 
consumerStreams.get(topic);

 61 for (final KafkaStream stream : streams) {
 62   ConsumerIterator consumerIte = 
stream.iterator();

 63   while (consumerIte.hasNext())
 64   {
 65  // System.out.println("Message from the Topic ...");
 66   String line = new 
String(consumerIte.next().message());

 67   //this.collector.emit(new Values(line), line);
 68   this.collector.emit(new Values(line));
 69   }
 70
 71
 72 }
 73 if (consumer != null)
 74   consumer.shutdown();
 75 }


And now it is running.

Strange because when worker died then I see log rows from spout. But I 
think it is related somehow with the internal stuff in storm.


Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"

On 03/06/14 11:09, Margusja wrote:

Some new information.
Set debug true and from active worker log I can see:
if worker is ok:
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack 
[7197822474056634252 -608920652033678418]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252 
-608920652033678418]
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker 
__ack_ack [7197822474056634252]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
source: KafkaConsumerSpout:1, stream: default, id: 
{4344988213623161794=-5214435544383558411},  my message...


and after worker dies there are only rows about spout like:
2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout 
__ack_init [3399515592775976300 5357635772515085965 1]
2014-06-03 11:06:30 b.s.d.t