Re: Worker dies (bolt)
Hei I have made a new test and discovered that in my environment a very simple bolt will die too after around 2500 cycle. Bolt's code: 1 package storm; 2 3 import backtype.storm.task.TopologyContext; 4 import backtype.storm.topology.BasicOutputCollector; 5 import backtype.storm.topology.OutputFieldsDeclarer; 6 import backtype.storm.topology.base.BaseBasicBolt; 7 import backtype.storm.tuple.Fields; 8 import backtype.storm.tuple.Tuple; 9 import backtype.storm.tuple.Values; 10 11 import java.util.Map; 12 import java.util.UUID; 13 14 public class DummyBolt extends BaseBasicBolt 15 { 16 int count = 0; 17 18 @Override 19 public void prepare(Map stormConf, TopologyContext context) { 20 } 21 22 @Override 23 public void execute(Tuple tuple, BasicOutputCollector collector) 24 { 25 String line = tuple.getString(0); 26 27 count ++; 28 System.out.println(Dummy count: + count); 29 collector.emit(new Values(line)); 30 31 } 32 33 @Override 34 public void declareOutputFields(OutputFieldsDeclarer declarer) 35 { 36 declarer.declare(new Fields(line)); 37 } 38 39 @Override 40 public void cleanup() 41 { 42 } 43 44 } after around 2500 cycles there is no output from execute methods. What I do after this. [root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids [root@dlvm2 sysconfig]# kill -9 4179 after it new worker is coming up and it works again around 2500 cycles and stops and I have to kill pid again. Any ideas? Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314) On 02/06/14 13:36, Margusja wrote: Hi I am using apache-storm-0.9.1-incubating. I have simple topology: Spout reads from kafka topic and Bolt writes lines from spout to HBase. recently we did a test - we send 300 000 000 messages over kafka-rest - kafka-queue - storm topology - hbase. I noticed that around one hour and around 2500 messages worker died. PID is there and process is up but bolt's execute method hangs. Bolts code is: package storm; 2 3 import java.util.Map; 4 import java.util.UUID; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.HBaseConfiguration; 8 import org.apache.hadoop.hbase.client.HTableInterface; 9 import org.apache.hadoop.hbase.client.HTablePool; 10 import org.apache.hadoop.hbase.client.Put; 11 import org.apache.hadoop.hbase.util.Bytes; 12 13 import backtype.storm.task.TopologyContext; 14 import backtype.storm.topology.BasicOutputCollector; 15 import backtype.storm.topology.OutputFieldsDeclarer; 16 import backtype.storm.topology.base.BaseBasicBolt; 17 import backtype.storm.tuple.Fields; 18 import backtype.storm.tuple.Tuple; 19 import backtype.storm.tuple.Values; public class HBaseWriterBolt extends BaseBasicBolt 22 { 23 24 HTableInterface usersTable; 25 HTablePool pool; 26 int count = 0; 27 28 @Override 29 public void prepare(Map stormConf, TopologyContext context) { 30 Configuration conf = HBaseConfiguration.create(); 31 conf.set(hbase.defaults.for.version,0.96.0.2.0.6.0-76-hadoop2); 32 conf.set(hbase.defaults.for.version.skip,true); 33 conf.set(hbase.zookeeper.quorum, vm24,vm37,vm38); 34 conf.set(hbase.zookeeper.property.clientPort, 2181); 35 conf.set(hbase.rootdir, hdfs://vm38:8020/user/hbase/data); 36 //conf.set(zookeeper.znode.parent, /hbase-unsecure); 37 38 pool = new HTablePool(conf, 1); 39 usersTable = pool.getTable(kafkademo1); 40 } 41 42 @Override 43 public void execute(Tuple tuple, BasicOutputCollector collector) 44 { 45 String line = tuple.getString(0); 46 47 Put p = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 48 p.add(Bytes.toBytes(info), Bytes.toBytes(line), Bytes.toBytes(line)); 49 50 try { 51 usersTable.put(p); 52 count ++; 53 System.out.println(Count: + count); 54 } 55 catch (Exception e){ 56 e.printStackTrace(); 57 } 58 collector.emit(new Values(line)); 59 60 } 61 62 @Override 63 public void declareOutputFields(OutputFieldsDeclarer declarer) 64 { 65 declarer.declare(new Fields(line)); 66 } 67 68 @Override 69 public void cleanup() 70 { 71 try { 72 usersTable.close(); 73
Re: Worker dies (bolt)
Some new information. Set debug true and from active worker log I can see: if worker is ok: 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack [7197822474056634252 -608920652033678418] 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252 -608920652033678418] 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker __ack_ack [7197822474056634252] 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message source: KafkaConsumerSpout:1, stream: default, id: {4344988213623161794=-5214435544383558411}, my message... and after worker dies there are only rows about spout like: 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout __ack_init [3399515592775976300 5357635772515085965 1] 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout default Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314) On 03/06/14 09:58, Margusja wrote: Hei I have made a new test and discovered that in my environment a very simple bolt will die too after around 2500 cycle. Bolt's code: 1 package storm; 2 3 import backtype.storm.task.TopologyContext; 4 import backtype.storm.topology.BasicOutputCollector; 5 import backtype.storm.topology.OutputFieldsDeclarer; 6 import backtype.storm.topology.base.BaseBasicBolt; 7 import backtype.storm.tuple.Fields; 8 import backtype.storm.tuple.Tuple; 9 import backtype.storm.tuple.Values; 10 11 import java.util.Map; 12 import java.util.UUID; 13 14 public class DummyBolt extends BaseBasicBolt 15 { 16 int count = 0; 17 18 @Override 19 public void prepare(Map stormConf, TopologyContext context) { 20 } 21 22 @Override 23 public void execute(Tuple tuple, BasicOutputCollector collector) 24 { 25 String line = tuple.getString(0); 26 27 count ++; 28 System.out.println(Dummy count: + count); 29 collector.emit(new Values(line)); 30 31 } 32 33 @Override 34 public void declareOutputFields(OutputFieldsDeclarer declarer) 35 { 36 declarer.declare(new Fields(line)); 37 } 38 39 @Override 40 public void cleanup() 41 { 42 } 43 44 } after around 2500 cycles there is no output from execute methods. What I do after this. [root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids [root@dlvm2 sysconfig]# kill -9 4179 after it new worker is coming up and it works again around 2500 cycles and stops and I have to kill pid again. Any ideas? Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314) On 02/06/14 13:36, Margusja wrote: Hi I am using apache-storm-0.9.1-incubating. I have simple topology: Spout reads from kafka topic and Bolt writes lines from spout to HBase. recently we did a test - we send 300 000 000 messages over kafka-rest - kafka-queue - storm topology - hbase. I noticed that around one hour and around 2500 messages worker died. PID is there and process is up but bolt's execute method hangs. Bolts code is: package storm; 2 3 import java.util.Map; 4 import java.util.UUID; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.HBaseConfiguration; 8 import org.apache.hadoop.hbase.client.HTableInterface; 9 import org.apache.hadoop.hbase.client.HTablePool; 10 import org.apache.hadoop.hbase.client.Put; 11 import org.apache.hadoop.hbase.util.Bytes; 12 13 import backtype.storm.task.TopologyContext; 14 import backtype.storm.topology.BasicOutputCollector; 15 import backtype.storm.topology.OutputFieldsDeclarer; 16 import backtype.storm.topology.base.BaseBasicBolt; 17 import backtype.storm.tuple.Fields; 18 import backtype.storm.tuple.Tuple; 19 import backtype.storm.tuple.Values; public class HBaseWriterBolt extends BaseBasicBolt 22 { 23 24 HTableInterface usersTable; 25 HTablePool pool; 26 int count = 0; 27 28 @Override 29 public void prepare(Map stormConf, TopologyContext context) { 30 Configuration conf = HBaseConfiguration.create(); 31 conf.set(hbase.defaults.for.version,0.96.0.2.0.6.0-76-hadoop2); 32 conf.set(hbase.defaults.for.version.skip,true); 33 conf.set(hbase.zookeeper.quorum, vm24,vm37,vm38); 34 conf.set(hbase.zookeeper.property.clientPort, 2181); 35 conf.set(hbase.rootdir, hdfs://vm38:8020/user/hbase/data); 36 //conf.set(zookeeper.znode.parent, /hbase-unsecure); 37 38 pool = new HTablePool(conf, 1); 39
RE: Storm with RDBMS
Thanks Alex and Ncleung for your inputs. Both the options looks to be valid based on the data size. I am thinking now, even RDBMS might scale well for this scenario if bolt is using only “Read” operations on database. However, will update later on the approach taken. Regards Balakrishna From: alex kamil [mailto:alex.ka...@gmail.com] Sent: Monday, June 02, 2014 10:18 PM To: user@storm.incubator.apache.org Subject: Re: Storm with RDBMS for parallel reads of massive historical data and high volume writes you could you a distributed db with SQL layer such as Apache Hbase+Phoenixhttp://phoenix.incubator.apache.org/, I think it might complement Storm nicely On Mon, Jun 2, 2014 at 10:19 AM, Nathan Leung ncle...@gmail.commailto:ncle...@gmail.com wrote: Something like memcached is commonly used for this scenario. Is memcached poorly suited for your goals or data access patterns? On Mon, Jun 2, 2014 at 10:06 AM, Balakrishna R balakrishn...@spanservices.commailto:balakrishn...@spanservices.com wrote: Hi, We are evaluating ‘Apache storm’ for one of the business use cases. In this use case, the incoming transactions/stream should be processed by set of rules or logic. In this process, there is a need of considering the historical data (may be 2 weeks or a month old) also. Understand that, Storm will give better performance to process the incoming transactions in real-time. What if we have to read the historical data from RDBMS and use that data in the bolts? Will this degrade the performance of whole cluster (as RDBMS systems might cause some delay due to the high load of reads from the parallelizing different bolts to achieve the better performance). Any suggestion on solving this situation? Please share. Thanks Balakrishna DISCLAIMER: This email message and all attachments are confidential and may contain information that is Privileged, Confidential or exempt from disclosure under applicable law. If you are not the intended recipient, you are notified that any dissemination, distribution or copying of this email is strictly prohibited. If you have received this email in error, please notify us immediately by return email to mailad...@spanservices.commailto:mailad...@spanservices.com and destroy the original message. Opinions, conclusions and other information in this message that do not relate to the official of SPAN, shall be understood to be nether given nor endorsed by SPAN. DISCLAIMER: This email message and all attachments are confidential and may contain information that is Privileged, Confidential or exempt from disclosure under applicable law. If you are not the intended recipient, you are notified that any dissemination, distribution or copying of this email is strictly prohibited. If you have received this email in error, please notify us immediately by return email to mailad...@spanservices.com and destroy the original message. Opinions, conclusions and other information in this message that do not relate to the official of SPAN, shall be understood to be nether given nor endorsed by SPAN.
Fwd: Can storm use the jar from Eclipse?
Hi, Right now, I have my storm cluster setting up (three machines respectively for zookeeper, nimbus, supervisor). Before submitting the topology to the storm cluster, I usually test storm program in local model with Eclipse. And then I use Eclipse to package the storm project and send to nimbus machine to submit. But whatever I package from Eclipse (the entire project or the target folder..), it shows error that cannot find class The following experments are what I did on Storm : Getting-Started.jar is the entire project jar packaged from Eclipse. I use jar -tf Getting-Started.jar to extract, you can see the path: Getting-Started/.classpath Getting-Started/.gitignore Getting-Started/.project Getting-Started/.settings/org.eclipse.jdt.core.prefs Getting-Started/.settings/org.eclipse.m2e.core.prefs Getting-Started/.settings/org.maven.ide.eclipse.prefs Getting-Started/Readme.txt Getting-Started/bin/data/ Getting-Started/bin/src/main/java/dataApi/ Getting-Started/bin/src/main/java/topology/bolts/ Getting-Started/bin/src/main/java/topology/data/ Getting-Started/bin/src/main/java/topology/spouts/ Getting-Started/bin/src/main/resources/ Getting-Started/bin/src/test/java/ Getting-Started/bin/target/classes/META-INF/maven/storm.book/ Getting-Started/bin/target/classes/dataApi/ Getting-Started/bin/target/classes/topology/bolts/ Getting-Started/bin/target/classes/topology/data/ Getting-Started/bin/target/classes/topology/spouts/ Getting-Started/data/ Getting-Started/pom.html Getting-Started/pom.xml Getting-Started/src/main/java/dataApi/DataConnector.java Getting-Started/src/main/java/dataApi/DataOutputController.java Getting-Started/src/main/java/dataApi/FinancialDataStream.java Getting-Started/src/main/java/dataApi/QuoteInterface.java Getting-Started/src/main/java/topology/FinancialDataOperationTopology.java Getting-Started/src/main/java/topology/RandomFinancialDataTopology.java Getting-Started/src/main/java/topology/bolts/DataNormalizerBolt.java Getting-Started/src/main/java/topology/bolts/QuoteAveragerBolt.java Getting-Started/src/main/java/topology/data/FinancialData.java Getting-Started/src/main/java/topology/data/Symbol.java Getting-Started/src/main/java/topology/spouts/FinancialDataSpout.java Getting-Started/src/main/java/topology/spouts/RandomFinancialDataSpout.java Getting-Started/src/main/resources/financialdata.txt Getting-Started/src/test/java/ Getting-Started/target/classes/META-INF/MANIFEST.MF Getting-Started/target/classes/META-INF/maven/storm.book/Getting-Started/pom.properties Getting-Started/target/classes/META-INF/maven/storm.book/Getting-Started/pom.xml Getting-Started/target/classes/dataApi/DataConnector.class Getting-Started/target/classes/dataApi/DataOutputController.class Getting-Started/target/classes/dataApi/FinancialDataStream.class Getting-Started/target/classes/dataApi/QuoteInterface.class Getting-Started/target/classes/financialdata.txt Getting-Started/target/classes/topology/FinancialDataOperationTopology.class Getting-Started/target/classes/topology/RandomFinancialDataTopology.class Getting-Started/target/classes/topology/bolts/DataNormalizerBolt.class Getting-Started/target/classes/topology/bolts/QuoteAveragerBolt.class Getting-Started/target/classes/topology/data/FinancialData.class Getting-Started/target/classes/topology/data/Symbol.class Getting-Started/target/classes/topology/spouts/FinancialDataSpout.class Getting-Started/target/classes/topology/spouts/RandomFinancialDataSpout.class Getting-Started/target/get.jar Getting-Started/target/test-classes/ I see the topology path in this jar is : Getting-Started/target/classes/topology/RandomFinancialDataTopology.class Then I use storm jar command: /usr/local/storm/bin/storm jar Getting-Started.jar topology.RandomFinancialDataTopology datatopology It shows this error: Exception in thread main java.lang.NoClassDefFoundError: Getting-Started/target/classes/topology/RandomFinancialDataTopology (wrong name: topology/RandomFinancialDataTopology) at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:643) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:277) at java.net.URLClassLoader.access$000(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:212) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:323) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:268) Could not find the main class: Getting-Started.target.classes.topology.RandomFinancialDataTopology. Program will exit. Then I try to use maven command mvn package to package on the nimbus command line and submit the topology, it works. When
MultiLag (Python) bolt gives error
Hi, I'm using python bolt which is in the resource directory but storm giving me error *7226 [Thread-19-exclaim1] INFO backtype.storm.daemon.executor - Preparing bolt exclaim1:(3)7231 [Thread-10] INFO backtype.storm.daemon.executor - Loading executor exclaim1:[4 4]7232 [Thread-19-exclaim1] ERROR backtype.storm.util - Async loop died!java.lang.RuntimeException: Error when launching multilang subprocessat backtype.storm.task.ShellBolt.prepare(ShellBolt.java:105) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]at backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183.invoke(executor.clj:689) ~[na:na]at backtype.storm.util$async_loop$fn__390.invoke(util.clj:431) ~[na:na]at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]Caused by: java.io.IOException: Cannot run program python (in directory /tmp/5f3c8318-14da-4999-a974-584a9d200fdb/supervisor/stormdist/mongo_20140528_02-1-1401786854/resources): error=2, No such file or directoryat java.lang.ProcessBuilder.start(ProcessBuilder.java:1041) ~[na:1.7.0_55] at backtype.storm.utils.ShellProcess.launch(ShellProcess.java:50) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]at backtype.storm.task.ShellBolt.prepare(ShellBolt.java:102) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]... 4 common frames omittedCaused by: java.io.IOException: error=2, No such file or directoryat java.lang.UNIXProcess.forkAndExec(Native Method) ~[na:1.7.0_55]at java.lang.UNIXProcess.init(UNIXProcess.java:135) ~[na:1.7.0_55]at java.lang.ProcessImpl.start(ProcessImpl.java:130) ~[na:1.7.0_55]at java.lang.ProcessBuilder.start(ProcessBuilder.java:1022) ~[na:1.7.0_55] ... 6 common frames omitted7233 [Thread-10] INFO backtype.storm.daemon.task - Emitting: exclaim1 __system [startup]7233 [Thread-10] INFO backtype.storm.daemon.executor - Loaded executor tasks exclaim1:[4 4]7233 [Thread-19-exclaim1] ERROR backtype.storm.daemon.executor - java.lang.RuntimeException: Error when launching multilang subprocessat backtype.storm.task.ShellBolt.prepare(ShellBolt.java:105) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]at backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183.invoke(executor.clj:689) ~[na:na]at backtype.storm.util$async_loop$fn__390.invoke(util.clj:431) ~[na:na]at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]Caused by: java.io.IOException: Cannot run program python (in directory /tmp/5f3c8318-14da-4999-a974-584a9d200fdb/supervisor/stormdist/mongo_20140528_02-1-1401786854/resources): error=2, No such file or directoryat java.lang.ProcessBuilder.start(ProcessBuilder.java:1041) ~[na:1.7.0_55] at backtype.storm.utils.ShellProcess.launch(ShellProcess.java:50) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]at backtype.storm.task.ShellBolt.prepare(ShellBolt.java:102) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]... 4 common frames omittedCaused by: java.io.IOException: error=2, No such file or directoryat java.lang.UNIXProcess.forkAndExec(Native Method) ~[na:1.7.0_55]at java.lang.UNIXProcess.init(UNIXProcess.java:135) ~[na:1.7.0_55]at java.lang.ProcessImpl.start(ProcessImpl.java:130) ~[na:1.7.0_55]at java.lang.ProcessBuilder.start(ProcessBuilder.java:1022) ~[na:1.7.0_55] ... 6 common frames omitted7235 [Thread-10] INFO backtype.storm.daemon.executor - Finished loading executor exclaim1:[4 4]7235 [Thread-21-exclaim1] INFO backtype.storm.daemon.executor - Preparing bolt exclaim1:(4)7237 [Thread-21-exclaim1] ERROR backtype.storm.util - Async loop died!java.lang.RuntimeException: Error when launching multilang subprocessat backtype.storm.task.ShellBolt.prepare(ShellBolt.java:105) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]at backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183.invoke(executor.clj:689) ~[na:na]at backtype.storm.util$async_loop$fn__390.invoke(util.clj:431) ~[na:na]at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]Caused by: java.io.IOException: Cannot run program python (in directory /tmp/5f3c8318-14da-4999-a974-584a9d200fdb/supervisor/stormdist/mongo_20140528_02-1-1401786854/resources): error=2, No such file or directoryat java.lang.ProcessBuilder.start(ProcessBuilder.java:1041) ~[na:1.7.0_55] at backtype.storm.utils.ShellProcess.launch(ShellProcess.java:50) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]at backtype.storm.task.ShellBolt.prepare(ShellBolt.java:102) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]... 4 common frames omittedCaused by: java.io.IOException: error=2, No such file or directoryat java.lang.UNIXProcess.forkAndExec(Native Method) ~[na:1.7.0_55]at
Kafka-storm - trident transactional topology - bad perf
I have a simple trident transactional topology that does something like the following: kafka transactional spout (~3000 rec/sec, 6 partitions thus paraHint=6) -- aggregate with reducerAggregator (paraHint=20) -- transactional state (I tried MemoryMapState, MemcachedMapState and CassandraMapState) -- new Stream -- print new values I tried to tune the topology by firstly setting maxSpoutPending=1 and batchEmitIntervals to a large value (1 sec), and then iteratively improve those values. I ended up with maxSpoutPending=20 batchEmitInterval=150ms However I observed 2 things 1/ Delay in the topology keeps increasing Even with those fine-tune values, or smaller values, it seems that some transactions fail and that trident replay them (transactional state). However this replaying process seems to delay the processing of new incoming data, and storm seems to never catch up after replaying. The result is that after a few minutes processing is clearly not real time anymore (the aggregate printed in the logs are those from a few min before, and it increases); even though I don't meet a particular bottleneck for the calculation (bolt capacity and latency are ok). Is this behavior normal ? Does it come from KafkaTransactionalSpout ? From trident transactional mechanism ? 2/ There is an unavoidable bottleneck on $spoutcoord-spout0 Because small failures keeps accumulating, tridents replay more and more transactions. spout0 performances are impacted (more work), but this can be scaled with more kafka partitions. However $spoutcoord-spout0 is always a unique thread in trident, whatever spout we provide, and I clearly observed that $spoutcoord-spout0 goes above 1 after some minutes (and latency is above 10 sec or something). Is there a way to improve this ? Or is this an unavoidable consequence of trident's transactional logic that can't be addressed ?
Re: Worker dies (bolt)
Ok got more info. Looks like the problem is related with spout. I changed spout: 32 public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) 33 { 34 this.collector = collector; 35 36 Properties props = new Properties(); 37 props.put(zookeeper.connect, vm24:2181,vm37:2181,vm38:2181); 38 props.put(group.id, testgroup); 39 props.put(zookeeper.session.timeout.ms, 500); 40 props.put(zookeeper.sync.time.ms, 250); 41 props.put(auto.commit.interval.ms, 1000); 42 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); 43 this.topic = kafkademo1; 44 45 46 } to 32 public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) 33 { 34 this.collector = collector; 35 36 Properties props = new Properties(); 37 props.put(zookeeper.connect, vm24:2181,vm37:2181,vm38:2181); 38 props.put(group.id, testgroup); 39 //props.put(zookeeper.session.timeout.ms, 500); 40 //props.put(zookeeper.sync.time.ms, 250); 41 //props.put(auto.commit.interval.ms, 1000); 42 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); 43 this.topic = kafkademo1; 44 45 46 } and 48 public void nextTuple() 49 { 50 55 56 MapString, Integer topicCount = new HashMapString, Integer(); 57 // Define single thread for topic 58 topicCount.put(topic, new Integer(1)); 59 MapString, ListKafkaStreambyte[], byte[] consumerStreams = consumer.createMessageStreams(topicCount); 60 ListKafkaStreambyte[], byte[] streams = consumerStreams.get(topic); 61 for (final KafkaStream stream : streams) { 62 ConsumerIteratorbyte[], byte[] consumerIte = stream.iterator(); 63 while (consumerIte.hasNext()) 64 { 65 // System.out.println(Message from the Topic ...); 66 String line = new String(consumerIte.next().message()); 67 this.collector.emit(new Values(line), line); 69 } 70 71 72 } 73 if (consumer != null) 74 consumer.shutdown(); 75 } to 48 public void nextTuple() 49 { 50 55 56 MapString, Integer topicCount = new HashMapString, Integer(); 57 // Define single thread for topic 58 topicCount.put(topic, new Integer(1)); 59 MapString, ListKafkaStreambyte[], byte[] consumerStreams = consumer.createMessageStreams(topicCount); 60 ListKafkaStreambyte[], byte[] streams = consumerStreams.get(topic); 61 for (final KafkaStream stream : streams) { 62 ConsumerIteratorbyte[], byte[] consumerIte = stream.iterator(); 63 while (consumerIte.hasNext()) 64 { 65 // System.out.println(Message from the Topic ...); 66 String line = new String(consumerIte.next().message()); 67 //this.collector.emit(new Values(line), line); 68 this.collector.emit(new Values(line)); 69 } 70 71 72 } 73 if (consumer != null) 74 consumer.shutdown(); 75 } And now it is running. Strange because when worker died then I see log rows from spout. But I think it is related somehow with the internal stuff in storm. Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314) On 03/06/14 11:09, Margusja wrote: Some new information. Set debug true and from active worker log I can see: if worker is ok: 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack [7197822474056634252 -608920652033678418] 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252 -608920652033678418] 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker __ack_ack [7197822474056634252] 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message source: KafkaConsumerSpout:1, stream: default, id: {4344988213623161794=-5214435544383558411}, my message... and after worker dies there are only rows about spout like: 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout __ack_init [3399515592775976300 5357635772515085965 1] 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout default Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee
RE: Workers constantly restarted due to session timeout
Thank you Derek for the explanation between :disallowed and :timed-out. That was extremely helpful in understanding what decisions Storm is making. I increased the timeouts for both messages to 5 minutes and returned the zookeeper session timeouts to their default values. This made it plain to see periods in time where the Uptime column for the busiest component's Worker would not update (1-2 minutes, potentially never resulting in a worker restart). ZK logs report constant disconnects and reconnects while the Uptime is not updating: 16:28:30,440 - INFO NIOServerCnxn@1001 - Closed socket connection for client /10.49.21.151:54004 which has sessionid 0x1464f1fddc1018f 16:31:18,364 - INFO NIOServerCnxnFactory@197 - Accepted socket connection from /10.49.21.151:34419 16.31:18,365 - WARN ZookeeperServer@793 - Connection request from old client /10.49.21.151:34419; will be dropped if server is in r-o mode 16:31:18,365 - INFO ZookeeperServer@832 - Client attempting to renew session 0x264f1fddc4021e at /10.49.21.151:34419 16:31:18,365 - INFO Learner@107 - Revalidating client: 0x264f1fddc4021e 16:31:18,366 - INFO ZooKeeperServer@588 - Invalid session 0x264f1fddc4021e for client /10.49.21.151:34419, probably expired 16:31:18,366 - NIOServerCnxn@1001 - Closed socket connection for client /10.49.21.151:34419 which had sessionid 0x264f1fddc4021e 16:31:18,378 - INFO NIOServerCnxnFactory@197 - Accepted socket connection from /10.49.21.151:34420 16:31:18,391 - WARN ZookeeperServer@793 - Connection request from old client /10.49.21.151:34420; will be dropped if server is in r-o mode 16:31:18,392 - INFO ZookeeperServer@839 - Client attempting to establish new session at /10.49.21.151:34420 16:31:18,394 - INFO ZookeeperServer@595 - Established session 0x1464fafddc10218 with negotiated timeout 2 for client /10.49.21.151:34420 16.31.44,002 - INFO NIOServerCnxn@1001 - Closed socket connection for /10.49.21.151:34420 which had sessionid 0x1464fafddc10218 16.32.48,055 - INFO NIOServerCxnFactory@197 - Accepted socket connection from /10.49.21.151:34432 16:32:48,056 - WARN ZookeeperServer@793 - Connection request from old client /10.49.21.151:34432; will be dropped if server is in r-o mode 16.32.48,056 - INFO ZookeeperServer@832 - Client attempting to renew session 0x2464fafddc4021f at /10.49.21.151:34432 16:32:48,056 - INFO Learner@107 - Revalidating client: 0x2464fafddc4021f 16:32:48,057 - INFO ZooKeeperServer@588 - Invalid session 0x2464fafddc4021f for client /10.49.21.151:34432, probably expired 16:32:48,057 - NIOServerCnxn@1001 - Closed socket connection for client /10.49.21.151:34432 which had sessionid 0x2464fafddc4021f ...etc until Storm has had enough and restarts the worker resulting in this 16:47:20,706 - NIOServerCnxn@349 - Caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x3464f20777e01cf, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.langThread.run(Thread.java:745) 1) Is it appropriate to run Zookeeper in parallel on the same node with the storm services? 2) We have zookeeper 3.4.5 installed. I see Storm uses zookeeper-3.3.3 as its client. Should we downgrade our installation? Date: Sat, 31 May 2014 13:50:57 -0500 From: der...@yahoo-inc.com To: user@storm.incubator.apache.org Subject: Re: Workers constantly restarted due to session timeout Are you certain that nimbus.task.timeout.secs is the correct config? That config controls the length of time before nimbus thinks a worker has timed out. https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/daemon/nimbus.clj#L369-L372 Its default is 30 seconds. https://github.com/apache/incubator-storm/blob/master/conf/defaults.yaml#L45 storm.zookeeper.connection.timeout: 30 storm.zookeeper.session.timeout: 30 So these will make the situation worse while workers losing connections to ZK, since it will cause the workers to wait longer before reconnecting. They could wait until nimbus thinks the worker is dead before trying to reconnect. supervisor: 2014-05-23 20:17:30 INFO supervisor:0 - Shutting down and clearing state for id 94349373-74ec-484b-a9f8-a5076e17d474. Current supervisor time: 1400876250. State: :disallowed, Heartbeat: #backtype.storm.daemon.common.WorkerHeartbeat{{:time-secs 1400876249, :storm-id test-46-1400863199, :executors #{[-1 -1]}, :port 6700} Here if the State is :disallowed, then that means it is Nimbus that de-scheduled the worker on that node--very probably in this case because it thought it was dead. When the supervisor sees this, it will kill the worker. (A state of :timed-out means instead that the worker did not heartbeat to its supervisor in time.) If the CPU load on the worker was
Re: Workers constantly restarted due to session timeout
1) Is it appropriate to run Zookeeper in parallel on the same node with the storm services? I recommend separate, and even then to ZK storage to a path on its own disk device if possible. ZK is a bottleneck for storm, and when it is too slow lots of bad things can happen. Some folks use shared hosts (with or without VMs) in which to run ZK. In those situations, VMs or processes owned by other users doing unrelated things can put load on the disk, and that will dramatically slow down ZK. 2) We have zookeeper 3.4.5 installed. I see Storm uses zookeeper-3.3.3 as its client. Should we downgrade our installation? I am not sure about that, since we've been running with ZK 3.4.5 in storm (and on the server). It might work very well, but I have not tried it. I do not remember if anyone on this list has identified any issues with this 3.3.3 + 3.4.5 combo. One setting we changed to dramatically improve performance with ZK was setting the system property '-Dzookeeper.forceSync=no' on the server. Normally, ZK will sync to disk on every write, and that causes two seeks: one for the data and one for the data log. It gets really expensive with all of the workers heartbeating in through ZK. Be warned that with only on ZK server, an outage could leave you in an inconsistent state. You might check to see if the ZK server is keeping up. There are tools like iotop that can give information about disk load. -- Derek On 6/3/14, 13:14, Michael Dev wrote: Thank you Derek for the explanation between :disallowed and :timed-out. That was extremely helpful in understanding what decisions Storm is making. I increased the timeouts for both messages to 5 minutes and returned the zookeeper session timeouts to their default values. This made it plain to see periods in time where the Uptime column for the busiest component's Worker would not update (1-2 minutes, potentially never resulting in a worker restart). ZK logs report constant disconnects and reconnects while the Uptime is not updating: 16:28:30,440 - INFO NIOServerCnxn@1001 - Closed socket connection for client /10.49.21.151:54004 which has sessionid 0x1464f1fddc1018f 16:31:18,364 - INFO NIOServerCnxnFactory@197 - Accepted socket connection from /10.49.21.151:34419 16.31:18,365 - WARN ZookeeperServer@793 - Connection request from old client /10.49.21.151:34419; will be dropped if server is in r-o mode 16:31:18,365 - INFO ZookeeperServer@832 - Client attempting to renew session 0x264f1fddc4021e at /10.49.21.151:34419 16:31:18,365 - INFO Learner@107 - Revalidating client: 0x264f1fddc4021e 16:31:18,366 - INFO ZooKeeperServer@588 - Invalid session 0x264f1fddc4021e for client /10.49.21.151:34419, probably expired 16:31:18,366 - NIOServerCnxn@1001 - Closed socket connection for client /10.49.21.151:34419 which had sessionid 0x264f1fddc4021e 16:31:18,378 - INFO NIOServerCnxnFactory@197 - Accepted socket connection from /10.49.21.151:34420 16:31:18,391 - WARN ZookeeperServer@793 - Connection request from old client /10.49.21.151:34420; will be dropped if server is in r-o mode 16:31:18,392 - INFO ZookeeperServer@839 - Client attempting to establish new session at /10.49.21.151:34420 16:31:18,394 - INFO ZookeeperServer@595 - Established session 0x1464fafddc10218 with negotiated timeout 2 for client /10.49.21.151:34420 16.31.44,002 - INFO NIOServerCnxn@1001 - Closed socket connection for /10.49.21.151:34420 which had sessionid 0x1464fafddc10218 16.32.48,055 - INFO NIOServerCxnFactory@197 - Accepted socket connection from /10.49.21.151:34432 16:32:48,056 - WARN ZookeeperServer@793 - Connection request from old client /10.49.21.151:34432; will be dropped if server is in r-o mode 16.32.48,056 - INFO ZookeeperServer@832 - Client attempting to renew session 0x2464fafddc4021f at /10.49.21.151:34432 16:32:48,056 - INFO Learner@107 - Revalidating client: 0x2464fafddc4021f 16:32:48,057 - INFO ZooKeeperServer@588 - Invalid session 0x2464fafddc4021f for client /10.49.21.151:34432, probably expired 16:32:48,057 - NIOServerCnxn@1001 - Closed socket connection for client /10.49.21.151:34432 which had sessionid 0x2464fafddc4021f ...etc until Storm has had enough and restarts the worker resulting in this 16:47:20,706 - NIOServerCnxn@349 - Caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x3464f20777e01cf, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.langThread.run(Thread.java:745) 1) Is it appropriate to run Zookeeper in parallel on the same node with the storm services? 2) We have zookeeper 3.4.5 installed. I see Storm uses zookeeper-3.3.3 as its client. Should we downgrade our installation? Date: Sat, 31 May 2014 13:50:57 -0500 From: der...@yahoo-inc.com To: user@storm.incubator.apache.org Subject: Re: Workers
java.lang.IllegalArgumentException: timeout value is negative
I'm getting this exception 2014-06-03 19:59:13 STDIO [ERROR][id:] Jun 03, 2014 7:59:13 PM org.jboss.netty.channel.DefaultChannelPipeline WARNING: An exception was thrown by a user handler while handling an exception event ([id: 0xdcf3be42] EXCEPTION: java.net.ConnectException: Connection refused) java.lang.IllegalArgumentException: timeout value is negative at java.lang.Thread.sleep(Native Method) at backtype.storm.messaging.netty.Client.reconnect(Client.java:94) at backtype.storm.messaging.netty.StormClientHandler.exceptionCaught(StormClientHandler.java:118) at org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:377) *Here's my storm.yaml (from nimbus/zookeeper only server hence ports are commented)* storm.zookeeper.servers: - ma-app05.corp.mydomain.com storm.zookeeper.port: 2181 storm.zookeeper.root: /storm storm.zookeeper.session.timeout: 2 storm.zookeeper.connection.timeout: 15000 storm.zookeeper.retry.times: 5 storm.zookeeper.retry.interval: 1000 storm.zookeeper.retry.intervalceiling.millis: 3 nimbus.host: ma-app05.corp.mydomain.com nimbus.thrift.port: 6627 nimbus.thrift.max_buffer_size: 1048576 nimbus.childopts: -Xmx1024m nimbus.task.timeout.secs: 30 nimbus.supervisor.timeout.secs: 60 nimbus.monitor.freq.secs: 10 nimbus.cleanup.inbox.freq.secs: 600 nimbus.inbox.jar.expiration.secs: 3600 nimbus.task.launch.secs: 120 nimbus.reassign: true nimbus.file.copy.expiration.secs: 600 nimbus.topology.validator: backtype.storm.nimbus.DefaultTopologyValidator #No supervisor in Nimbus. Pls. note that storm might still be loading the default conf, if it requires. #supervisor.slots.ports: # - 9951 # - 9952 # - 9953 # - 9954 # - 9955 #supervisor.worker.start.timeout.secs: 120 #supervisor.worker.timeout.secs: 30 #supervisor.monitor.frequency.secs: 3 #supervisor.heartbeat.frequency.secs: 5 #supervisor.enable: true ui.port: 8181 ui.childopts: -Xmx256m logviewer.port: 8000 logviewer.childopts: -Xmx128m logviewer.appender.name: A1 worker.childopts: -Xmx512m -XX:MaxPermSize=256m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc -Xloggc:/app/local/var/logs/jvm-stat/gc-storm-worker-%ID%.log worker.heartbeat.frequency.secs: 1 task.heartbeat.frequency.secs: 3 task.refresh.poll.secs: 10 #zmq.threads: 1 #zmq.linger.millis: 5000 #zmq.hwm: 0 storm.local.dir: /ngs/app/isdbd/local/var/run/storm/data storm.local.mode.zmq: false storm.cluster.mode: distributed storm.thrift.transport: backtype.storm.security.auth.SimpleTransportPlugin storm.messaging.transport: backtype.storm.messaging.netty.Context storm.messaging.transport: backtype.storm.messaging.netty.Context storm.messaging.netty.server_worker_threads: 1 storm.messaging.netty.client_worker_threads: 1 storm.messaging.netty.buffer_size: 5242880 storm.messaging.netty.max_retries: 100 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 500 java.library.path: /usr/local/lib:/opt/local/lib:/usr/lib topology.enable.message.timeouts: true topology.debug: false topology.optimize: true topology.workers: 5 topology.acker.executors: null topology.tasks: null topology.message.timeout.secs: 30 topology.skip.missing.kryo.registrations: false topology.max.task.parallelism: null topology.max.spout.pending: 20 topology.state.synchronization.timeout.secs: 60 topology.stats.sample.rate: 0.05 topology.builtin.metrics.bucket.size.secs: 60 topology.fall.back.on.java.serialization: true topology.worker.childopts: -Xmx512m topology.executor.receive.buffer.size: 1024 topology.executor.send.buffer.size: 1024 topology.receiver.buffer.size: 8 topology.transfer.buffer.size: 1024 topology.tick.tuple.freq.secs: null topology.worker.shared.thread.pool.size: 4 topology.disruptor.wait.strategy: com.lmax.disruptor.BlockingWaitStrategy topology.spout.wait.strategy: backtype.storm.spout.SleepSpoutWaitStrategy topology.sleep.spout.wait.strategy.time.ms: 1 topology.error.throttle.interval.secs: 10 topology.max.error.report.per.interval: 5 topology.kryo.factory: backtype.storm.serialization.DefaultKryoFactory topology.tuple.serializer: backtype.storm.serialization.types.ListDelegateSerializer topology.trident.batch.emit.interval.millis: 500 #dev.zookeeper.path: /tmp/dev-storm-zookeeper
how does PersistentAggregate distribute the DB Calls ?
How does PersistentAggregate distribute the database calls across all the worked nodes ? Does it do the global aggregation then choose a single host to do a multiget/multiput to the external db ? Thanks -- Raphael Hsieh
Re: how does PersistentAggregate distribute the DB Calls ?
Thanks for your quick reply nathan. So I'm doing some debugging of my topology, and I've removed all the logic from my MultiPut function, replacing it with a single System.out.println() Then i am monitoring my logs to check when this gets printed out. It looks like every single one of my hosts (workers) hits this. Does this then indicate that I am processing many many partitions that each hit this multiPut and prints out? Thanks. On Tue, Jun 3, 2014 at 3:29 PM, Nathan Marz nat...@nathanmarz.com wrote: When possible it will do as much aggregation Storm-side so as to minimize amount it needs to interact with database. So if you do a persistent global count, for example, it will compute the count for the batch (in parallel), and then the task that finishes the global count will do a single get/update/put to the database. On Tue, Jun 3, 2014 at 3:08 PM, Raphael Hsieh raffihs...@gmail.com wrote: How does PersistentAggregate distribute the database calls across all the worked nodes ? Does it do the global aggregation then choose a single host to do a multiget/multiput to the external db ? Thanks -- Raphael Hsieh -- Twitter: @nathanmarz http://nathanmarz.com -- Raphael Hsieh
If I register a metrics object with a bolt/spout, will it run in the same thread as the bolt/spout?
Or do I need to worry about synchronization issue? Thanks in advance!
If I register a metrics object with a bolt/spout task, will it run in the same thread as the task?
Sorry, changed the title to make it more accurate. On Tue, Jun 3, 2014 at 11:12 PM, Xueming Li james.xueming...@gmail.com wrote: Or do I need to worry about synchronization issue? Thanks in advance!