Ok got more info.
Looks like the problem is related with spout.

I changed spout:

32 public void open(Map conf, TopologyContext context,SpoutOutputCollector collector)
 33         {
 34                 this.collector = collector;
 35
 36                 Properties props = new Properties();
37 props.put("zookeeper.connect", "vm24:2181,vm37:2181,vm38:2181");
 38                 props.put("group.id", "testgroup");
 39                 props.put("zookeeper.session.timeout.ms", "500");
 40                 props.put("zookeeper.sync.time.ms", "250");
 41                 props.put("auto.commit.interval.ms", "1000");
42 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
 43                 this.topic = "kafkademo1";
 44
 45
 46         }

to

32 public void open(Map conf, TopologyContext context,SpoutOutputCollector collector)
 33         {
 34                 this.collector = collector;
 35
 36                 Properties props = new Properties();
37 props.put("zookeeper.connect", "vm24:2181,vm37:2181,vm38:2181");
 38                 props.put("group.id", "testgroup");
 39                 //props.put("zookeeper.session.timeout.ms", "500");
 40                 //props.put("zookeeper.sync.time.ms", "250");
 41                 //props.put("auto.commit.interval.ms", "1000");
42 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
 43                 this.topic = "kafkademo1";
 44
 45
 46         }

and
 48         public void nextTuple()
 49         {
 50
 55
56 Map<String, Integer> topicCount = new HashMap<String, Integer>();
 57             // Define single thread for topic
 58             topicCount.put(topic, new Integer(1));
59 Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); 60 List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
 61             for (final KafkaStream stream : streams) {
62 ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
 63               while (consumerIte.hasNext())
 64               {
 65                  // System.out.println("Message from the Topic ...");
66 String line = new String(consumerIte.next().message());
 67                   this.collector.emit(new Values(line), line);
 69               }
 70
 71
 72             }
 73             if (consumer != null)
 74               consumer.shutdown();
 75         }

to

 48         public void nextTuple()
 49         {
 50
 55
56 Map<String, Integer> topicCount = new HashMap<String, Integer>();
 57             // Define single thread for topic
 58             topicCount.put(topic, new Integer(1));
59 Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); 60 List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
 61             for (final KafkaStream stream : streams) {
62 ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
 63               while (consumerIte.hasNext())
 64               {
 65                  // System.out.println("Message from the Topic ...");
66 String line = new String(consumerIte.next().message());
 67                   //this.collector.emit(new Values(line), line);
 68                   this.collector.emit(new Values(line));
 69               }
 70
 71
 72             }
 73             if (consumer != null)
 74               consumer.shutdown();
 75         }


And now it is running.

Strange because when worker died then I see log rows from spout. But I think it is related somehow with the internal stuff in storm.

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"

On 03/06/14 11:09, Margusja wrote:
Some new information.
Set debug true and from active worker log I can see:
if worker is ok:
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack [7197822474056634252 -608920652033678418] 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252 -608920652033678418] 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker __ack_ack [7197822474056634252] 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message source: KafkaConsumerSpout:1, stream: default, id: {4344988213623161794=-5214435544383558411}, my message...

and after worker dies there are only rows about spout like:
2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout __ack_init [3399515592775976300 5357635772515085965 1] 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout default

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"

On 03/06/14 09:58, Margusja wrote:
Hei

I have made a new test and discovered that in my environment a very simple bolt will die too after around 2500 cycle.

Bolt's code:

  1 package storm;
  2
  3 import backtype.storm.task.TopologyContext;
  4 import backtype.storm.topology.BasicOutputCollector;
  5 import backtype.storm.topology.OutputFieldsDeclarer;
  6 import backtype.storm.topology.base.BaseBasicBolt;
  7 import backtype.storm.tuple.Fields;
  8 import backtype.storm.tuple.Tuple;
  9 import backtype.storm.tuple.Values;
 10
 11 import java.util.Map;
 12 import java.util.UUID;
 13
 14 public class DummyBolt extends BaseBasicBolt
 15 {
 16     int count = 0;
 17
 18          @Override
19 public void prepare(Map stormConf, TopologyContext context) {
 20          }
 21
 22         @Override
23 public void execute(Tuple tuple, BasicOutputCollector collector)
 24         {
 25                 String line  = tuple.getString(0);
 26
 27                 count ++;
 28                 System.out.println("Dummy count: "+ count);
 29                 collector.emit(new Values(line));
 30
 31         }
 32
 33         @Override
34 public void declareOutputFields(OutputFieldsDeclarer declarer)
 35         {
 36                 declarer.declare(new Fields("line"));
 37         }
 38
 39         @Override
 40         public void cleanup()
 41         {
 42         }
 43
 44 }

after around 2500 cycles there is no output from execute methods.
What I do after this.
[root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids
[root@dlvm2 sysconfig]# kill -9 4179
after it new worker is coming up and it works again around 2500 cycles and stops and I have to kill pid again.

Any ideas?

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"

On 02/06/14 13:36, Margusja wrote:
Hi

I am using apache-storm-0.9.1-incubating.
I have simple topology: Spout reads from kafka topic and Bolt writes lines from spout to HBase.

recently we did a test - we send 300 000 000 messages over kafka-rest -> kafka-queue -> storm topology -> hbase.

I noticed that around one hour and around 2500 messages worker died. PID is there and process is up but bolt's execute method hangs.

Bolts code is:
package storm;
  2
  3 import java.util.Map;
  4 import java.util.UUID;
  5
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.hbase.HBaseConfiguration;
  8 import org.apache.hadoop.hbase.client.HTableInterface;
  9 import org.apache.hadoop.hbase.client.HTablePool;
 10 import org.apache.hadoop.hbase.client.Put;
 11 import org.apache.hadoop.hbase.util.Bytes;
 12
 13 import backtype.storm.task.TopologyContext;
 14 import backtype.storm.topology.BasicOutputCollector;
 15 import backtype.storm.topology.OutputFieldsDeclarer;
 16 import backtype.storm.topology.base.BaseBasicBolt;
 17 import backtype.storm.tuple.Fields;
 18 import backtype.storm.tuple.Tuple;
 19 import backtype.storm.tuple.Values;

public class HBaseWriterBolt extends BaseBasicBolt
 22 {
 23
 24     HTableInterface usersTable;
 25     HTablePool pool;
 26     int count = 0;
 27
 28          @Override
29 public void prepare(Map stormConf, TopologyContext context) {
 30                  Configuration conf = HBaseConfiguration.create();
 31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2");
 32 conf.set("hbase.defaults.for.version.skip","true");
33 conf.set("hbase.zookeeper.quorum", "vm24,vm37,vm38");
 34 conf.set("hbase.zookeeper.property.clientPort", "2181");
35 conf.set("hbase.rootdir", "hdfs://vm38:8020/user/hbase/data"); 36 //conf.set("zookeeper.znode.parent", "/hbase-unsecure");
 37
 38                  pool = new HTablePool(conf, 1);
 39                  usersTable = pool.getTable("kafkademo1");
 40          }
41
 42         @Override
43 public void execute(Tuple tuple, BasicOutputCollector collector)
 44         {
 45                 String line  = tuple.getString(0);
 46
47 Put p = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 48 p.add(Bytes.toBytes("info"), Bytes.toBytes("line"), Bytes.toBytes(line));
 49
 50                 try {
 51                         usersTable.put(p);
 52                         count ++;
 53                         System.out.println("Count: "+ count);
 54                 }
 55                 catch (Exception e){
 56                         e.printStackTrace();
 57                 }
 58                 collector.emit(new Values(line));
 59
 60         }
 61
 62         @Override
63 public void declareOutputFields(OutputFieldsDeclarer declarer)
 64         {
 65                 declarer.declare(new Fields("line"));
 66         }
 67
 68         @Override
 69         public void cleanup()
 70         {
 71                 try {
 72                         usersTable.close();
 73                 }
 74                 catch (Exception e){
 75                         e.printStackTrace();
 76                 }
 77         }
 78
 79 }

line in execute method: System.out.println("Count: "+ count); added in debug purpose to see in log that bolt is running.

In to Spout in method  nextTuple()
I added debug line: System.out.println("Message from the Topic ...");

After some time around 50minutes in log file I can see that Spout is working but Bolt is died.

Any ideas?




Reply via email to