Re: Worker dies (bolt)

2014-06-03 Thread Margusja

Hei

I have made a new test and discovered that in my environment a very 
simple bolt will die too after around 2500 cycle.


Bolt's code:

  1 package storm;
  2
  3 import backtype.storm.task.TopologyContext;
  4 import backtype.storm.topology.BasicOutputCollector;
  5 import backtype.storm.topology.OutputFieldsDeclarer;
  6 import backtype.storm.topology.base.BaseBasicBolt;
  7 import backtype.storm.tuple.Fields;
  8 import backtype.storm.tuple.Tuple;
  9 import backtype.storm.tuple.Values;
 10
 11 import java.util.Map;
 12 import java.util.UUID;
 13
 14 public class DummyBolt extends BaseBasicBolt
 15 {
 16 int count = 0;
 17
 18  @Override
 19  public void prepare(Map stormConf, TopologyContext context) {
 20  }
 21
 22 @Override
 23 public void execute(Tuple tuple, BasicOutputCollector 
collector)

 24 {
 25 String line  = tuple.getString(0);
 26
 27 count ++;
 28 System.out.println(Dummy count: + count);
 29 collector.emit(new Values(line));
 30
 31 }
 32
 33 @Override
 34 public void declareOutputFields(OutputFieldsDeclarer declarer)
 35 {
 36 declarer.declare(new Fields(line));
 37 }
 38
 39 @Override
 40 public void cleanup()
 41 {
 42 }
 43
 44 }

after around 2500 cycles there is no output from execute methods.
What I do after this.
[root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids
[root@dlvm2 sysconfig]# kill -9 4179
after it  new worker is coming up and it works again around 2500 cycles 
and stops and I have to kill pid again.


Any ideas?

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)

On 02/06/14 13:36, Margusja wrote:

Hi

I am using apache-storm-0.9.1-incubating.
I have simple topology: Spout reads from kafka topic and Bolt writes 
lines from spout to HBase.


recently we did a test - we send 300 000 000 messages over kafka-rest 
- kafka-queue - storm topology - hbase.


I noticed that around one hour and around 2500 messages worker died. 
PID is there and process is up but bolt's execute method hangs.


Bolts code is:
package storm;
  2
  3 import java.util.Map;
  4 import java.util.UUID;
  5
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.hbase.HBaseConfiguration;
  8 import org.apache.hadoop.hbase.client.HTableInterface;
  9 import org.apache.hadoop.hbase.client.HTablePool;
 10 import org.apache.hadoop.hbase.client.Put;
 11 import org.apache.hadoop.hbase.util.Bytes;
 12
 13 import backtype.storm.task.TopologyContext;
 14 import backtype.storm.topology.BasicOutputCollector;
 15 import backtype.storm.topology.OutputFieldsDeclarer;
 16 import backtype.storm.topology.base.BaseBasicBolt;
 17 import backtype.storm.tuple.Fields;
 18 import backtype.storm.tuple.Tuple;
 19 import backtype.storm.tuple.Values;

public class HBaseWriterBolt extends BaseBasicBolt
 22 {
 23
 24 HTableInterface usersTable;
 25 HTablePool pool;
 26 int count = 0;
 27
 28  @Override
 29  public void prepare(Map stormConf, TopologyContext 
context) {

 30  Configuration conf = HBaseConfiguration.create();
 31 conf.set(hbase.defaults.for.version,0.96.0.2.0.6.0-76-hadoop2);
 32 conf.set(hbase.defaults.for.version.skip,true);
 33  conf.set(hbase.zookeeper.quorum, 
vm24,vm37,vm38);

 34 conf.set(hbase.zookeeper.property.clientPort, 2181);
 35  conf.set(hbase.rootdir, 
hdfs://vm38:8020/user/hbase/data);
 36  //conf.set(zookeeper.znode.parent, 
/hbase-unsecure);

 37
 38  pool = new HTablePool(conf, 1);
 39  usersTable = pool.getTable(kafkademo1);
 40  }
41
 42 @Override
 43 public void execute(Tuple tuple, BasicOutputCollector 
collector)

 44 {
 45 String line  = tuple.getString(0);
 46
 47 Put p = new 
Put(Bytes.toBytes(UUID.randomUUID().toString()));
 48 p.add(Bytes.toBytes(info), 
Bytes.toBytes(line), Bytes.toBytes(line));

 49
 50 try {
 51 usersTable.put(p);
 52 count ++;
 53 System.out.println(Count: + count);
 54 }
 55 catch (Exception e){
 56 e.printStackTrace();
 57 }
 58 collector.emit(new Values(line));
 59
 60 }
 61
 62 @Override
 63 public void declareOutputFields(OutputFieldsDeclarer 
declarer)

 64 {
 65 declarer.declare(new Fields(line));
 66 }
 67
 68 @Override
 69 public void cleanup()
 70 {
 71 try {
 72 usersTable.close();
 73

Re: Worker dies (bolt)

2014-06-03 Thread Margusja

Some new information.
Set debug true and from active worker log I can see:
if worker is ok:
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack 
[7197822474056634252 -608920652033678418]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252 
-608920652033678418]
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker 
__ack_ack [7197822474056634252]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
source: KafkaConsumerSpout:1, stream: default, id: 
{4344988213623161794=-5214435544383558411},  my message...


and after worker dies there are only rows about spout like:
2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout 
__ack_init [3399515592775976300 5357635772515085965 1]

2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout default

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)

On 03/06/14 09:58, Margusja wrote:

Hei

I have made a new test and discovered that in my environment a very 
simple bolt will die too after around 2500 cycle.


Bolt's code:

  1 package storm;
  2
  3 import backtype.storm.task.TopologyContext;
  4 import backtype.storm.topology.BasicOutputCollector;
  5 import backtype.storm.topology.OutputFieldsDeclarer;
  6 import backtype.storm.topology.base.BaseBasicBolt;
  7 import backtype.storm.tuple.Fields;
  8 import backtype.storm.tuple.Tuple;
  9 import backtype.storm.tuple.Values;
 10
 11 import java.util.Map;
 12 import java.util.UUID;
 13
 14 public class DummyBolt extends BaseBasicBolt
 15 {
 16 int count = 0;
 17
 18  @Override
 19  public void prepare(Map stormConf, TopologyContext 
context) {

 20  }
 21
 22 @Override
 23 public void execute(Tuple tuple, BasicOutputCollector 
collector)

 24 {
 25 String line  = tuple.getString(0);
 26
 27 count ++;
 28 System.out.println(Dummy count: + count);
 29 collector.emit(new Values(line));
 30
 31 }
 32
 33 @Override
 34 public void declareOutputFields(OutputFieldsDeclarer 
declarer)

 35 {
 36 declarer.declare(new Fields(line));
 37 }
 38
 39 @Override
 40 public void cleanup()
 41 {
 42 }
 43
 44 }

after around 2500 cycles there is no output from execute methods.
What I do after this.
[root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids
[root@dlvm2 sysconfig]# kill -9 4179
after it  new worker is coming up and it works again around 2500 
cycles and stops and I have to kill pid again.


Any ideas?

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)

On 02/06/14 13:36, Margusja wrote:

Hi

I am using apache-storm-0.9.1-incubating.
I have simple topology: Spout reads from kafka topic and Bolt writes 
lines from spout to HBase.


recently we did a test - we send 300 000 000 messages over kafka-rest 
- kafka-queue - storm topology - hbase.


I noticed that around one hour and around 2500 messages worker died. 
PID is there and process is up but bolt's execute method hangs.


Bolts code is:
package storm;
  2
  3 import java.util.Map;
  4 import java.util.UUID;
  5
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.hbase.HBaseConfiguration;
  8 import org.apache.hadoop.hbase.client.HTableInterface;
  9 import org.apache.hadoop.hbase.client.HTablePool;
 10 import org.apache.hadoop.hbase.client.Put;
 11 import org.apache.hadoop.hbase.util.Bytes;
 12
 13 import backtype.storm.task.TopologyContext;
 14 import backtype.storm.topology.BasicOutputCollector;
 15 import backtype.storm.topology.OutputFieldsDeclarer;
 16 import backtype.storm.topology.base.BaseBasicBolt;
 17 import backtype.storm.tuple.Fields;
 18 import backtype.storm.tuple.Tuple;
 19 import backtype.storm.tuple.Values;

public class HBaseWriterBolt extends BaseBasicBolt
 22 {
 23
 24 HTableInterface usersTable;
 25 HTablePool pool;
 26 int count = 0;
 27
 28  @Override
 29  public void prepare(Map stormConf, TopologyContext 
context) {

 30  Configuration conf = HBaseConfiguration.create();
 31 conf.set(hbase.defaults.for.version,0.96.0.2.0.6.0-76-hadoop2);
 32 conf.set(hbase.defaults.for.version.skip,true);
 33  conf.set(hbase.zookeeper.quorum, 
vm24,vm37,vm38);

 34 conf.set(hbase.zookeeper.property.clientPort, 2181);
 35  conf.set(hbase.rootdir, 
hdfs://vm38:8020/user/hbase/data);
 36  //conf.set(zookeeper.znode.parent, 
/hbase-unsecure);

 37
 38  pool = new HTablePool(conf, 1);
 39  

Re: Worker dies (bolt)

2014-06-03 Thread Margusja

Ok got more info.
Looks like the problem is related with spout.

I changed spout:

 32 public void open(Map conf, TopologyContext 
context,SpoutOutputCollector collector)

 33 {
 34 this.collector = collector;
 35
 36 Properties props = new Properties();
 37 props.put(zookeeper.connect, 
vm24:2181,vm37:2181,vm38:2181);

 38 props.put(group.id, testgroup);
 39 props.put(zookeeper.session.timeout.ms, 500);
 40 props.put(zookeeper.sync.time.ms, 250);
 41 props.put(auto.commit.interval.ms, 1000);
 42 consumer = Consumer.createJavaConsumerConnector(new 
ConsumerConfig(props));

 43 this.topic = kafkademo1;
 44
 45
 46 }

to

 32 public void open(Map conf, TopologyContext 
context,SpoutOutputCollector collector)

 33 {
 34 this.collector = collector;
 35
 36 Properties props = new Properties();
 37 props.put(zookeeper.connect, 
vm24:2181,vm37:2181,vm38:2181);

 38 props.put(group.id, testgroup);
 39 //props.put(zookeeper.session.timeout.ms, 500);
 40 //props.put(zookeeper.sync.time.ms, 250);
 41 //props.put(auto.commit.interval.ms, 1000);
 42 consumer = Consumer.createJavaConsumerConnector(new 
ConsumerConfig(props));

 43 this.topic = kafkademo1;
 44
 45
 46 }

and
 48 public void nextTuple()
 49 {
 50
 55
 56 MapString, Integer topicCount = new HashMapString, 
Integer();

 57 // Define single thread for topic
 58 topicCount.put(topic, new Integer(1));
 59 MapString, ListKafkaStreambyte[], byte[] 
consumerStreams = consumer.createMessageStreams(topicCount);
 60 ListKafkaStreambyte[], byte[] streams = 
consumerStreams.get(topic);

 61 for (final KafkaStream stream : streams) {
 62   ConsumerIteratorbyte[], byte[] consumerIte = 
stream.iterator();

 63   while (consumerIte.hasNext())
 64   {
 65  // System.out.println(Message from the Topic ...);
 66   String line = new 
String(consumerIte.next().message());

 67   this.collector.emit(new Values(line), line);
 69   }
 70
 71
 72 }
 73 if (consumer != null)
 74   consumer.shutdown();
 75 }

to

 48 public void nextTuple()
 49 {
 50
 55
 56 MapString, Integer topicCount = new HashMapString, 
Integer();

 57 // Define single thread for topic
 58 topicCount.put(topic, new Integer(1));
 59 MapString, ListKafkaStreambyte[], byte[] 
consumerStreams = consumer.createMessageStreams(topicCount);
 60 ListKafkaStreambyte[], byte[] streams = 
consumerStreams.get(topic);

 61 for (final KafkaStream stream : streams) {
 62   ConsumerIteratorbyte[], byte[] consumerIte = 
stream.iterator();

 63   while (consumerIte.hasNext())
 64   {
 65  // System.out.println(Message from the Topic ...);
 66   String line = new 
String(consumerIte.next().message());

 67   //this.collector.emit(new Values(line), line);
 68   this.collector.emit(new Values(line));
 69   }
 70
 71
 72 }
 73 if (consumer != null)
 74   consumer.shutdown();
 75 }


And now it is running.

Strange because when worker died then I see log rows from spout. But I 
think it is related somehow with the internal stuff in storm.


Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)

On 03/06/14 11:09, Margusja wrote:

Some new information.
Set debug true and from active worker log I can see:
if worker is ok:
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack 
[7197822474056634252 -608920652033678418]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252 
-608920652033678418]
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker 
__ack_ack [7197822474056634252]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
source: KafkaConsumerSpout:1, stream: default, id: 
{4344988213623161794=-5214435544383558411},  my message...


and after worker dies there are only rows about spout like:
2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout 
__ack_init [3399515592775976300 5357635772515085965 1]
2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout 
default


Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee