Re: Worker dies (bolt)

2014-06-03 Thread Margusja

Hei

I have made a new test and discovered that in my environment a very 
simple bolt will die too after around 2500 cycle.


Bolt's code:

  1 package storm;
  2
  3 import backtype.storm.task.TopologyContext;
  4 import backtype.storm.topology.BasicOutputCollector;
  5 import backtype.storm.topology.OutputFieldsDeclarer;
  6 import backtype.storm.topology.base.BaseBasicBolt;
  7 import backtype.storm.tuple.Fields;
  8 import backtype.storm.tuple.Tuple;
  9 import backtype.storm.tuple.Values;
 10
 11 import java.util.Map;
 12 import java.util.UUID;
 13
 14 public class DummyBolt extends BaseBasicBolt
 15 {
 16 int count = 0;
 17
 18  @Override
 19  public void prepare(Map stormConf, TopologyContext context) {
 20  }
 21
 22 @Override
 23 public void execute(Tuple tuple, BasicOutputCollector 
collector)

 24 {
 25 String line  = tuple.getString(0);
 26
 27 count ++;
 28 System.out.println(Dummy count: + count);
 29 collector.emit(new Values(line));
 30
 31 }
 32
 33 @Override
 34 public void declareOutputFields(OutputFieldsDeclarer declarer)
 35 {
 36 declarer.declare(new Fields(line));
 37 }
 38
 39 @Override
 40 public void cleanup()
 41 {
 42 }
 43
 44 }

after around 2500 cycles there is no output from execute methods.
What I do after this.
[root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids
[root@dlvm2 sysconfig]# kill -9 4179
after it  new worker is coming up and it works again around 2500 cycles 
and stops and I have to kill pid again.


Any ideas?

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)

On 02/06/14 13:36, Margusja wrote:

Hi

I am using apache-storm-0.9.1-incubating.
I have simple topology: Spout reads from kafka topic and Bolt writes 
lines from spout to HBase.


recently we did a test - we send 300 000 000 messages over kafka-rest 
- kafka-queue - storm topology - hbase.


I noticed that around one hour and around 2500 messages worker died. 
PID is there and process is up but bolt's execute method hangs.


Bolts code is:
package storm;
  2
  3 import java.util.Map;
  4 import java.util.UUID;
  5
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.hbase.HBaseConfiguration;
  8 import org.apache.hadoop.hbase.client.HTableInterface;
  9 import org.apache.hadoop.hbase.client.HTablePool;
 10 import org.apache.hadoop.hbase.client.Put;
 11 import org.apache.hadoop.hbase.util.Bytes;
 12
 13 import backtype.storm.task.TopologyContext;
 14 import backtype.storm.topology.BasicOutputCollector;
 15 import backtype.storm.topology.OutputFieldsDeclarer;
 16 import backtype.storm.topology.base.BaseBasicBolt;
 17 import backtype.storm.tuple.Fields;
 18 import backtype.storm.tuple.Tuple;
 19 import backtype.storm.tuple.Values;

public class HBaseWriterBolt extends BaseBasicBolt
 22 {
 23
 24 HTableInterface usersTable;
 25 HTablePool pool;
 26 int count = 0;
 27
 28  @Override
 29  public void prepare(Map stormConf, TopologyContext 
context) {

 30  Configuration conf = HBaseConfiguration.create();
 31 conf.set(hbase.defaults.for.version,0.96.0.2.0.6.0-76-hadoop2);
 32 conf.set(hbase.defaults.for.version.skip,true);
 33  conf.set(hbase.zookeeper.quorum, 
vm24,vm37,vm38);

 34 conf.set(hbase.zookeeper.property.clientPort, 2181);
 35  conf.set(hbase.rootdir, 
hdfs://vm38:8020/user/hbase/data);
 36  //conf.set(zookeeper.znode.parent, 
/hbase-unsecure);

 37
 38  pool = new HTablePool(conf, 1);
 39  usersTable = pool.getTable(kafkademo1);
 40  }
41
 42 @Override
 43 public void execute(Tuple tuple, BasicOutputCollector 
collector)

 44 {
 45 String line  = tuple.getString(0);
 46
 47 Put p = new 
Put(Bytes.toBytes(UUID.randomUUID().toString()));
 48 p.add(Bytes.toBytes(info), 
Bytes.toBytes(line), Bytes.toBytes(line));

 49
 50 try {
 51 usersTable.put(p);
 52 count ++;
 53 System.out.println(Count: + count);
 54 }
 55 catch (Exception e){
 56 e.printStackTrace();
 57 }
 58 collector.emit(new Values(line));
 59
 60 }
 61
 62 @Override
 63 public void declareOutputFields(OutputFieldsDeclarer 
declarer)

 64 {
 65 declarer.declare(new Fields(line));
 66 }
 67
 68 @Override
 69 public void cleanup()
 70 {
 71 try {
 72 usersTable.close();
 73

Re: Worker dies (bolt)

2014-06-03 Thread Margusja

Some new information.
Set debug true and from active worker log I can see:
if worker is ok:
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack 
[7197822474056634252 -608920652033678418]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252 
-608920652033678418]
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker 
__ack_ack [7197822474056634252]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
source: KafkaConsumerSpout:1, stream: default, id: 
{4344988213623161794=-5214435544383558411},  my message...


and after worker dies there are only rows about spout like:
2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout 
__ack_init [3399515592775976300 5357635772515085965 1]

2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout default

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)

On 03/06/14 09:58, Margusja wrote:

Hei

I have made a new test and discovered that in my environment a very 
simple bolt will die too after around 2500 cycle.


Bolt's code:

  1 package storm;
  2
  3 import backtype.storm.task.TopologyContext;
  4 import backtype.storm.topology.BasicOutputCollector;
  5 import backtype.storm.topology.OutputFieldsDeclarer;
  6 import backtype.storm.topology.base.BaseBasicBolt;
  7 import backtype.storm.tuple.Fields;
  8 import backtype.storm.tuple.Tuple;
  9 import backtype.storm.tuple.Values;
 10
 11 import java.util.Map;
 12 import java.util.UUID;
 13
 14 public class DummyBolt extends BaseBasicBolt
 15 {
 16 int count = 0;
 17
 18  @Override
 19  public void prepare(Map stormConf, TopologyContext 
context) {

 20  }
 21
 22 @Override
 23 public void execute(Tuple tuple, BasicOutputCollector 
collector)

 24 {
 25 String line  = tuple.getString(0);
 26
 27 count ++;
 28 System.out.println(Dummy count: + count);
 29 collector.emit(new Values(line));
 30
 31 }
 32
 33 @Override
 34 public void declareOutputFields(OutputFieldsDeclarer 
declarer)

 35 {
 36 declarer.declare(new Fields(line));
 37 }
 38
 39 @Override
 40 public void cleanup()
 41 {
 42 }
 43
 44 }

after around 2500 cycles there is no output from execute methods.
What I do after this.
[root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids
[root@dlvm2 sysconfig]# kill -9 4179
after it  new worker is coming up and it works again around 2500 
cycles and stops and I have to kill pid again.


Any ideas?

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)

On 02/06/14 13:36, Margusja wrote:

Hi

I am using apache-storm-0.9.1-incubating.
I have simple topology: Spout reads from kafka topic and Bolt writes 
lines from spout to HBase.


recently we did a test - we send 300 000 000 messages over kafka-rest 
- kafka-queue - storm topology - hbase.


I noticed that around one hour and around 2500 messages worker died. 
PID is there and process is up but bolt's execute method hangs.


Bolts code is:
package storm;
  2
  3 import java.util.Map;
  4 import java.util.UUID;
  5
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.hbase.HBaseConfiguration;
  8 import org.apache.hadoop.hbase.client.HTableInterface;
  9 import org.apache.hadoop.hbase.client.HTablePool;
 10 import org.apache.hadoop.hbase.client.Put;
 11 import org.apache.hadoop.hbase.util.Bytes;
 12
 13 import backtype.storm.task.TopologyContext;
 14 import backtype.storm.topology.BasicOutputCollector;
 15 import backtype.storm.topology.OutputFieldsDeclarer;
 16 import backtype.storm.topology.base.BaseBasicBolt;
 17 import backtype.storm.tuple.Fields;
 18 import backtype.storm.tuple.Tuple;
 19 import backtype.storm.tuple.Values;

public class HBaseWriterBolt extends BaseBasicBolt
 22 {
 23
 24 HTableInterface usersTable;
 25 HTablePool pool;
 26 int count = 0;
 27
 28  @Override
 29  public void prepare(Map stormConf, TopologyContext 
context) {

 30  Configuration conf = HBaseConfiguration.create();
 31 conf.set(hbase.defaults.for.version,0.96.0.2.0.6.0-76-hadoop2);
 32 conf.set(hbase.defaults.for.version.skip,true);
 33  conf.set(hbase.zookeeper.quorum, 
vm24,vm37,vm38);

 34 conf.set(hbase.zookeeper.property.clientPort, 2181);
 35  conf.set(hbase.rootdir, 
hdfs://vm38:8020/user/hbase/data);
 36  //conf.set(zookeeper.znode.parent, 
/hbase-unsecure);

 37
 38  pool = new HTablePool(conf, 1);
 39  

RE: Storm with RDBMS

2014-06-03 Thread Balakrishna R
Thanks Alex and Ncleung for your inputs.

Both the options looks to be valid based on the data size. I am thinking now, 
even RDBMS might scale well for this scenario if bolt is using only “Read” 
operations on database. However, will update later on the approach taken.


Regards
Balakrishna
From: alex kamil [mailto:alex.ka...@gmail.com]
Sent: Monday, June 02, 2014 10:18 PM
To: user@storm.incubator.apache.org
Subject: Re: Storm with RDBMS

for parallel reads of massive historical data and high volume writes you could 
you a distributed db with SQL layer such as Apache 
Hbase+Phoenixhttp://phoenix.incubator.apache.org/, I think it might  
complement Storm nicely

On Mon, Jun 2, 2014 at 10:19 AM, Nathan Leung 
ncle...@gmail.commailto:ncle...@gmail.com wrote:
Something like memcached is commonly used for this scenario.  Is memcached 
poorly suited for your goals or data access patterns?

On Mon, Jun 2, 2014 at 10:06 AM, Balakrishna R 
balakrishn...@spanservices.commailto:balakrishn...@spanservices.com wrote:
Hi,

We are evaluating ‘Apache storm’ for one of the business use cases. In this use 
case, the incoming transactions/stream should be processed by set of rules or 
logic. In this process, there is a need of considering the historical data (may 
be 2 weeks or a month old) also.

Understand that, Storm will give better performance to process the incoming 
transactions in real-time. What if we have to read the historical data from 
RDBMS and use that data in the bolts?
Will this degrade the performance of whole cluster (as RDBMS systems might 
cause some delay due to the high load of reads from the parallelizing different 
bolts to achieve the better performance).

Any suggestion on solving this situation? Please share.


Thanks
Balakrishna

DISCLAIMER: This email message and all attachments are confidential and may 
contain information that is Privileged, Confidential or exempt from disclosure 
under applicable law. If you are not the intended recipient, you are notified 
that any dissemination, distribution or copying of this email is strictly 
prohibited.  If you have received this email in error, please notify us 
immediately by return email to 
mailad...@spanservices.commailto:mailad...@spanservices.com and destroy the 
original message.  Opinions, conclusions and other information in this message 
that do not relate to the official of SPAN, shall be understood to be nether 
given nor endorsed by SPAN.


DISCLAIMER: This email message and all attachments are confidential and may 
contain information that is Privileged, Confidential or exempt from disclosure 
under applicable law. If you are not the intended recipient, you are notified 
that any dissemination, distribution or copying of this email is strictly 
prohibited.  If you have received this email in error, please notify us 
immediately by return email to mailad...@spanservices.com and destroy the 
original message.  Opinions, conclusions and other information in this message 
that do not relate to the official of SPAN, shall be understood to be nether 
given nor endorsed by SPAN.


Fwd: Can storm use the jar from Eclipse?

2014-06-03 Thread Cui Qin
Hi,

Right now, I have my storm cluster setting up (three machines respectively
for zookeeper, nimbus, supervisor). Before submitting the topology to the
storm cluster, I usually test storm program in local model with Eclipse.
And then I use Eclipse to package the storm project and send to nimbus
machine to submit. But whatever I package from Eclipse (the entire project
or the target folder..), it shows error that cannot find class The
following experments are what I did on Storm :

Getting-Started.jar is the entire project jar packaged from Eclipse. I use
jar -tf Getting-Started.jar to extract, you can see the path:

Getting-Started/.classpath
Getting-Started/.gitignore
Getting-Started/.project
Getting-Started/.settings/org.eclipse.jdt.core.prefs
Getting-Started/.settings/org.eclipse.m2e.core.prefs
Getting-Started/.settings/org.maven.ide.eclipse.prefs
Getting-Started/Readme.txt
Getting-Started/bin/data/
Getting-Started/bin/src/main/java/dataApi/
Getting-Started/bin/src/main/java/topology/bolts/
Getting-Started/bin/src/main/java/topology/data/
Getting-Started/bin/src/main/java/topology/spouts/
Getting-Started/bin/src/main/resources/
Getting-Started/bin/src/test/java/
Getting-Started/bin/target/classes/META-INF/maven/storm.book/
Getting-Started/bin/target/classes/dataApi/
Getting-Started/bin/target/classes/topology/bolts/
Getting-Started/bin/target/classes/topology/data/
Getting-Started/bin/target/classes/topology/spouts/
Getting-Started/data/
Getting-Started/pom.html
Getting-Started/pom.xml
Getting-Started/src/main/java/dataApi/DataConnector.java
Getting-Started/src/main/java/dataApi/DataOutputController.java
Getting-Started/src/main/java/dataApi/FinancialDataStream.java
Getting-Started/src/main/java/dataApi/QuoteInterface.java
Getting-Started/src/main/java/topology/FinancialDataOperationTopology.java
Getting-Started/src/main/java/topology/RandomFinancialDataTopology.java
Getting-Started/src/main/java/topology/bolts/DataNormalizerBolt.java
Getting-Started/src/main/java/topology/bolts/QuoteAveragerBolt.java
Getting-Started/src/main/java/topology/data/FinancialData.java
Getting-Started/src/main/java/topology/data/Symbol.java
Getting-Started/src/main/java/topology/spouts/FinancialDataSpout.java
Getting-Started/src/main/java/topology/spouts/RandomFinancialDataSpout.java
Getting-Started/src/main/resources/financialdata.txt
Getting-Started/src/test/java/
Getting-Started/target/classes/META-INF/MANIFEST.MF
Getting-Started/target/classes/META-INF/maven/storm.book/Getting-Started/pom.properties
Getting-Started/target/classes/META-INF/maven/storm.book/Getting-Started/pom.xml
Getting-Started/target/classes/dataApi/DataConnector.class
Getting-Started/target/classes/dataApi/DataOutputController.class
Getting-Started/target/classes/dataApi/FinancialDataStream.class
Getting-Started/target/classes/dataApi/QuoteInterface.class
Getting-Started/target/classes/financialdata.txt
Getting-Started/target/classes/topology/FinancialDataOperationTopology.class
Getting-Started/target/classes/topology/RandomFinancialDataTopology.class
Getting-Started/target/classes/topology/bolts/DataNormalizerBolt.class
Getting-Started/target/classes/topology/bolts/QuoteAveragerBolt.class
Getting-Started/target/classes/topology/data/FinancialData.class
Getting-Started/target/classes/topology/data/Symbol.class
Getting-Started/target/classes/topology/spouts/FinancialDataSpout.class
Getting-Started/target/classes/topology/spouts/RandomFinancialDataSpout.class
Getting-Started/target/get.jar
Getting-Started/target/test-classes/

I see the topology path in this jar is :
Getting-Started/target/classes/topology/RandomFinancialDataTopology.class
Then I use storm jar command:
 /usr/local/storm/bin/storm jar Getting-Started.jar
topology.RandomFinancialDataTopology datatopology

It shows this error:
Exception in thread main java.lang.NoClassDefFoundError:
Getting-Started/target/classes/topology/RandomFinancialDataTopology (wrong
name: topology/RandomFinancialDataTopology)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
Could not find the main class:
Getting-Started.target.classes.topology.RandomFinancialDataTopology.
Program will exit.

Then I try to use maven command mvn package to package on the nimbus
command line and submit the topology, it works. When 

MultiLag (Python) bolt gives error

2014-06-03 Thread Hamza Asad
Hi,
I'm using python bolt which is in the resource directory but storm giving
me error
































































































































*7226 [Thread-19-exclaim1] INFO  backtype.storm.daemon.executor - Preparing
bolt exclaim1:(3)7231 [Thread-10] INFO  backtype.storm.daemon.executor -
Loading executor exclaim1:[4 4]7232 [Thread-19-exclaim1] ERROR
backtype.storm.util - Async loop died!java.lang.RuntimeException: Error
when launching multilang subprocessat
backtype.storm.task.ShellBolt.prepare(ShellBolt.java:105)
~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]at
backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183.invoke(executor.clj:689)
~[na:na]at backtype.storm.util$async_loop$fn__390.invoke(util.clj:431)
~[na:na]at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]Caused by:
java.io.IOException: Cannot run program python (in directory
/tmp/5f3c8318-14da-4999-a974-584a9d200fdb/supervisor/stormdist/mongo_20140528_02-1-1401786854/resources):
error=2, No such file or directoryat
java.lang.ProcessBuilder.start(ProcessBuilder.java:1041) ~[na:1.7.0_55]
at backtype.storm.utils.ShellProcess.launch(ShellProcess.java:50)
~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]at
backtype.storm.task.ShellBolt.prepare(ShellBolt.java:102)
~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]... 4 common frames
omittedCaused by: java.io.IOException: error=2, No such file or
directoryat java.lang.UNIXProcess.forkAndExec(Native Method)
~[na:1.7.0_55]at java.lang.UNIXProcess.init(UNIXProcess.java:135)
~[na:1.7.0_55]at java.lang.ProcessImpl.start(ProcessImpl.java:130)
~[na:1.7.0_55]at
java.lang.ProcessBuilder.start(ProcessBuilder.java:1022) ~[na:1.7.0_55]
... 6 common frames omitted7233 [Thread-10] INFO
backtype.storm.daemon.task - Emitting: exclaim1 __system [startup]7233
[Thread-10] INFO  backtype.storm.daemon.executor - Loaded executor tasks
exclaim1:[4 4]7233 [Thread-19-exclaim1] ERROR
backtype.storm.daemon.executor - java.lang.RuntimeException: Error when
launching multilang subprocessat
backtype.storm.task.ShellBolt.prepare(ShellBolt.java:105)
~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]at
backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183.invoke(executor.clj:689)
~[na:na]at backtype.storm.util$async_loop$fn__390.invoke(util.clj:431)
~[na:na]at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]Caused by:
java.io.IOException: Cannot run program python (in directory
/tmp/5f3c8318-14da-4999-a974-584a9d200fdb/supervisor/stormdist/mongo_20140528_02-1-1401786854/resources):
error=2, No such file or directoryat
java.lang.ProcessBuilder.start(ProcessBuilder.java:1041) ~[na:1.7.0_55]
at backtype.storm.utils.ShellProcess.launch(ShellProcess.java:50)
~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]at
backtype.storm.task.ShellBolt.prepare(ShellBolt.java:102)
~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]... 4 common frames
omittedCaused by: java.io.IOException: error=2, No such file or
directoryat java.lang.UNIXProcess.forkAndExec(Native Method)
~[na:1.7.0_55]at java.lang.UNIXProcess.init(UNIXProcess.java:135)
~[na:1.7.0_55]at java.lang.ProcessImpl.start(ProcessImpl.java:130)
~[na:1.7.0_55]at
java.lang.ProcessBuilder.start(ProcessBuilder.java:1022) ~[na:1.7.0_55]
... 6 common frames omitted7235 [Thread-10] INFO
backtype.storm.daemon.executor - Finished loading executor exclaim1:[4
4]7235 [Thread-21-exclaim1] INFO  backtype.storm.daemon.executor -
Preparing bolt exclaim1:(4)7237 [Thread-21-exclaim1] ERROR
backtype.storm.util - Async loop died!java.lang.RuntimeException: Error
when launching multilang subprocessat
backtype.storm.task.ShellBolt.prepare(ShellBolt.java:105)
~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]at
backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183.invoke(executor.clj:689)
~[na:na]at backtype.storm.util$async_loop$fn__390.invoke(util.clj:431)
~[na:na]at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]Caused by:
java.io.IOException: Cannot run program python (in directory
/tmp/5f3c8318-14da-4999-a974-584a9d200fdb/supervisor/stormdist/mongo_20140528_02-1-1401786854/resources):
error=2, No such file or directoryat
java.lang.ProcessBuilder.start(ProcessBuilder.java:1041) ~[na:1.7.0_55]
at backtype.storm.utils.ShellProcess.launch(ShellProcess.java:50)
~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]at
backtype.storm.task.ShellBolt.prepare(ShellBolt.java:102)
~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]... 4 common frames
omittedCaused by: java.io.IOException: error=2, No such file or
directoryat java.lang.UNIXProcess.forkAndExec(Native Method)
~[na:1.7.0_55]at 

Kafka-storm - trident transactional topology - bad perf

2014-06-03 Thread Romain Leroux
I have a simple trident transactional topology that does something like the
following:

kafka transactional spout (~3000 rec/sec, 6 partitions thus paraHint=6) --
aggregate with reducerAggregator (paraHint=20) --
transactional state (I tried MemoryMapState, MemcachedMapState and
CassandraMapState) --
new Stream --
print new values

I tried to tune the topology by firstly setting maxSpoutPending=1 and
batchEmitIntervals to a large value (1 sec), and then iteratively improve
those values.
I ended up with maxSpoutPending=20 batchEmitInterval=150ms

However I observed 2 things

1/ Delay in the topology keeps increasing
Even with those fine-tune values, or smaller values, it seems that some
transactions fail and that trident replay them (transactional state).
However this replaying process seems to delay the processing of new
incoming data, and storm seems to never catch up after replaying.
The result is that after a few minutes processing is clearly not real
time anymore (the aggregate printed in the logs are those from a few min
before, and it increases); even though I don't meet a particular bottleneck
for the calculation (bolt capacity and latency are ok).
Is this behavior normal ? Does it come from KafkaTransactionalSpout ? From
trident transactional mechanism ?

2/ There is an unavoidable bottleneck on $spoutcoord-spout0
Because small failures keeps accumulating, tridents replay more and more
transactions.
spout0 performances are impacted (more work), but this can be scaled with
more kafka partitions.
However $spoutcoord-spout0 is always a unique thread in trident, whatever
spout we provide, and I clearly observed that $spoutcoord-spout0 goes above
1 after some minutes (and latency is above 10 sec or something).
Is there a way to improve this ? Or is this an unavoidable consequence of
trident's transactional logic that can't be addressed ?


Re: Worker dies (bolt)

2014-06-03 Thread Margusja

Ok got more info.
Looks like the problem is related with spout.

I changed spout:

 32 public void open(Map conf, TopologyContext 
context,SpoutOutputCollector collector)

 33 {
 34 this.collector = collector;
 35
 36 Properties props = new Properties();
 37 props.put(zookeeper.connect, 
vm24:2181,vm37:2181,vm38:2181);

 38 props.put(group.id, testgroup);
 39 props.put(zookeeper.session.timeout.ms, 500);
 40 props.put(zookeeper.sync.time.ms, 250);
 41 props.put(auto.commit.interval.ms, 1000);
 42 consumer = Consumer.createJavaConsumerConnector(new 
ConsumerConfig(props));

 43 this.topic = kafkademo1;
 44
 45
 46 }

to

 32 public void open(Map conf, TopologyContext 
context,SpoutOutputCollector collector)

 33 {
 34 this.collector = collector;
 35
 36 Properties props = new Properties();
 37 props.put(zookeeper.connect, 
vm24:2181,vm37:2181,vm38:2181);

 38 props.put(group.id, testgroup);
 39 //props.put(zookeeper.session.timeout.ms, 500);
 40 //props.put(zookeeper.sync.time.ms, 250);
 41 //props.put(auto.commit.interval.ms, 1000);
 42 consumer = Consumer.createJavaConsumerConnector(new 
ConsumerConfig(props));

 43 this.topic = kafkademo1;
 44
 45
 46 }

and
 48 public void nextTuple()
 49 {
 50
 55
 56 MapString, Integer topicCount = new HashMapString, 
Integer();

 57 // Define single thread for topic
 58 topicCount.put(topic, new Integer(1));
 59 MapString, ListKafkaStreambyte[], byte[] 
consumerStreams = consumer.createMessageStreams(topicCount);
 60 ListKafkaStreambyte[], byte[] streams = 
consumerStreams.get(topic);

 61 for (final KafkaStream stream : streams) {
 62   ConsumerIteratorbyte[], byte[] consumerIte = 
stream.iterator();

 63   while (consumerIte.hasNext())
 64   {
 65  // System.out.println(Message from the Topic ...);
 66   String line = new 
String(consumerIte.next().message());

 67   this.collector.emit(new Values(line), line);
 69   }
 70
 71
 72 }
 73 if (consumer != null)
 74   consumer.shutdown();
 75 }

to

 48 public void nextTuple()
 49 {
 50
 55
 56 MapString, Integer topicCount = new HashMapString, 
Integer();

 57 // Define single thread for topic
 58 topicCount.put(topic, new Integer(1));
 59 MapString, ListKafkaStreambyte[], byte[] 
consumerStreams = consumer.createMessageStreams(topicCount);
 60 ListKafkaStreambyte[], byte[] streams = 
consumerStreams.get(topic);

 61 for (final KafkaStream stream : streams) {
 62   ConsumerIteratorbyte[], byte[] consumerIte = 
stream.iterator();

 63   while (consumerIte.hasNext())
 64   {
 65  // System.out.println(Message from the Topic ...);
 66   String line = new 
String(consumerIte.next().message());

 67   //this.collector.emit(new Values(line), line);
 68   this.collector.emit(new Values(line));
 69   }
 70
 71
 72 }
 73 if (consumer != null)
 74   consumer.shutdown();
 75 }


And now it is running.

Strange because when worker died then I see log rows from spout. But I 
think it is related somehow with the internal stuff in storm.


Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)

On 03/06/14 11:09, Margusja wrote:

Some new information.
Set debug true and from active worker log I can see:
if worker is ok:
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack 
[7197822474056634252 -608920652033678418]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252 
-608920652033678418]
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker 
__ack_ack [7197822474056634252]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
source: KafkaConsumerSpout:1, stream: default, id: 
{4344988213623161794=-5214435544383558411},  my message...


and after worker dies there are only rows about spout like:
2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout 
__ack_init [3399515592775976300 5357635772515085965 1]
2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout 
default


Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee

RE: Workers constantly restarted due to session timeout

2014-06-03 Thread Michael Dev



Thank you Derek for the explanation between :disallowed and :timed-out. That 
was extremely helpful in understanding what decisions Storm is making. I 
increased the timeouts for both messages to 5 minutes and returned the 
zookeeper session timeouts to their default values. This made it plain to see 
periods in time where the Uptime column for the busiest component's Worker 
would not update (1-2 minutes, potentially never resulting in a worker restart).

ZK logs report constant disconnects and reconnects while the Uptime is not 
updating:
16:28:30,440 - INFO NIOServerCnxn@1001 - Closed socket connection for client 
/10.49.21.151:54004 which has sessionid 0x1464f1fddc1018f
16:31:18,364 - INFO NIOServerCnxnFactory@197 - Accepted socket connection from 
/10.49.21.151:34419
16.31:18,365 - WARN ZookeeperServer@793 - Connection request from old client 
/10.49.21.151:34419; will be dropped if server is in r-o mode
16:31:18,365 - INFO ZookeeperServer@832 - Client attempting to renew session 
0x264f1fddc4021e at /10.49.21.151:34419
16:31:18,365 - INFO Learner@107 - Revalidating client: 0x264f1fddc4021e 
16:31:18,366 - INFO ZooKeeperServer@588 - Invalid session 0x264f1fddc4021e for 
client /10.49.21.151:34419, probably expired
16:31:18,366 - NIOServerCnxn@1001 - Closed socket connection for client 
/10.49.21.151:34419 which had sessionid 0x264f1fddc4021e 
16:31:18,378 - INFO NIOServerCnxnFactory@197 - Accepted socket connection from  
/10.49.21.151:34420
16:31:18,391 - WARN ZookeeperServer@793 - Connection request from old 
client /10.49.21.151:34420; will be dropped if server is in r-o mode
16:31:18,392 - INFO ZookeeperServer@839 - Client attempting to establish new 
session at /10.49.21.151:34420
16:31:18,394 - INFO ZookeeperServer@595 - Established session 0x1464fafddc10218 
with negotiated timeout 2 for client /10.49.21.151:34420
16.31.44,002 - INFO NIOServerCnxn@1001 - Closed socket connection for 
/10.49.21.151:34420 which had sessionid 0x1464fafddc10218
16.32.48,055 - INFO NIOServerCxnFactory@197 - Accepted socket connection from 
/10.49.21.151:34432
16:32:48,056 - WARN ZookeeperServer@793 - Connection request from old 
client /10.49.21.151:34432; will be dropped if server is in r-o mode
16.32.48,056 - INFO ZookeeperServer@832 - Client attempting to renew session 
0x2464fafddc4021f at /10.49.21.151:34432
16:32:48,056 - INFO Learner@107 - Revalidating client: 0x2464fafddc4021f 
16:32:48,057 - INFO ZooKeeperServer@588 - Invalid session 0x2464fafddc4021f for 
client /10.49.21.151:34432, probably expired
16:32:48,057 - NIOServerCnxn@1001 - Closed socket connection for client 
/10.49.21.151:34432 which had sessionid 0x2464fafddc4021f 
...etc until Storm has had enough and restarts the worker resulting in this
16:47:20,706 - NIOServerCnxn@349 - Caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid 
0x3464f20777e01cf, likely client has closed socket
   at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
   at 
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
   at java.langThread.run(Thread.java:745)

1) Is it appropriate to run Zookeeper in parallel on the same node with the 
storm services?
2) We have zookeeper 3.4.5 installed. I see Storm uses zookeeper-3.3.3 as its 
client. Should we downgrade our installation?

 Date: Sat, 31 May 2014 13:50:57 -0500
 From: der...@yahoo-inc.com
 To: user@storm.incubator.apache.org
 Subject: Re: Workers constantly restarted due to session timeout
 
  Are you certain that nimbus.task.timeout.secs is the correct config?
 
 That config controls the length of time before nimbus thinks a worker has 
 timed out.
 
 https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/daemon/nimbus.clj#L369-L372
 
 Its default is 30 seconds.
 
 https://github.com/apache/incubator-storm/blob/master/conf/defaults.yaml#L45
 
 
  storm.zookeeper.connection.timeout: 30
  storm.zookeeper.session.timeout: 30
 
 So these will make the situation worse while workers losing connections to 
 ZK, since it will cause the workers to wait longer before reconnecting.  They 
 could wait until nimbus thinks the worker is dead before trying to reconnect.
 
 
  supervisor: 2014-05-23 20:17:30 INFO supervisor:0 - Shutting down and 
  clearing state for id 94349373-74ec-484b-a9f8-a5076e17d474. Current 
  supervisor time: 1400876250. State: :disallowed, Heartbeat: 
  #backtype.storm.daemon.common.WorkerHeartbeat{{:time-secs 1400876249, 
  :storm-id test-46-1400863199, :executors #{[-1 -1]}, :port 6700}
 
 Here if the State is :disallowed, then that means it is Nimbus that 
 de-scheduled the worker on that node--very probably in this case because it 
 thought it was dead.  When the supervisor sees this, it will kill the worker. 
  (A state of :timed-out means instead that the worker did not heartbeat to 
 its supervisor in time.)
 
 If the CPU load on the worker was 

Re: Workers constantly restarted due to session timeout

2014-06-03 Thread Derek Dagit

1) Is it appropriate to run Zookeeper in parallel on the same node with the 
storm services?


I recommend separate, and even then to ZK storage to a path on its own disk 
device if possible.  ZK is a bottleneck for storm, and when it is too slow lots 
of bad things can happen.

Some folks use shared hosts (with or without VMs) in which to run ZK.  In those 
situations, VMs or processes owned by other users doing unrelated things can 
put load on the disk, and that will dramatically slow down ZK.



2) We have zookeeper 3.4.5 installed. I see Storm uses zookeeper-3.3.3 as its 
client. Should we downgrade our installation?


I am not sure about that, since we've been running with ZK 3.4.5 in storm (and 
on the server).  It might work very well, but I have not tried it.  I do not 
remember if anyone on this list has identified any issues with this 3.3.3 + 
3.4.5 combo.


One setting we changed to dramatically improve performance with ZK was setting 
the system property '-Dzookeeper.forceSync=no' on the server.

Normally, ZK will sync to disk on every write, and that causes two seeks: one 
for the data and one for the data log.  It gets really expensive with all of 
the workers heartbeating in through ZK.  Be warned that with only on ZK server, 
an outage could leave you in an inconsistent state.

You might check to see if the ZK server is keeping up.  There are tools like 
iotop that can give information about disk load.

--
Derek

On 6/3/14, 13:14, Michael Dev wrote:




Thank you Derek for the explanation between :disallowed and :timed-out. That was 
extremely helpful in understanding what decisions Storm is making. I increased the 
timeouts for both messages to 5 minutes and returned the zookeeper session timeouts to 
their default values. This made it plain to see periods in time where the 
Uptime column for the busiest component's Worker would not update (1-2 
minutes, potentially never resulting in a worker restart).

ZK logs report constant disconnects and reconnects while the Uptime is not 
updating:
16:28:30,440 - INFO NIOServerCnxn@1001 - Closed socket connection for client 
/10.49.21.151:54004 which has sessionid 0x1464f1fddc1018f
16:31:18,364 - INFO NIOServerCnxnFactory@197 - Accepted socket connection from 
/10.49.21.151:34419
16.31:18,365 - WARN ZookeeperServer@793 - Connection request from old client 
/10.49.21.151:34419; will be dropped if server is in r-o mode
16:31:18,365 - INFO ZookeeperServer@832 - Client attempting to renew session 
0x264f1fddc4021e at /10.49.21.151:34419
16:31:18,365 - INFO Learner@107 - Revalidating client: 0x264f1fddc4021e
16:31:18,366 - INFO ZooKeeperServer@588 - Invalid session 0x264f1fddc4021e for 
client /10.49.21.151:34419, probably expired
16:31:18,366 - NIOServerCnxn@1001 - Closed socket connection for client 
/10.49.21.151:34419 which had sessionid 0x264f1fddc4021e
16:31:18,378 - INFO NIOServerCnxnFactory@197 - Accepted socket connection from  
/10.49.21.151:34420
16:31:18,391 - WARN ZookeeperServer@793 - Connection request from old
client /10.49.21.151:34420; will be dropped if server is in r-o mode
16:31:18,392 - INFO ZookeeperServer@839 - Client attempting to establish new 
session at /10.49.21.151:34420
16:31:18,394 - INFO ZookeeperServer@595 - Established session 0x1464fafddc10218 
with negotiated timeout 2 for client /10.49.21.151:34420
16.31.44,002 - INFO NIOServerCnxn@1001 - Closed socket connection for 
/10.49.21.151:34420 which had sessionid 0x1464fafddc10218
16.32.48,055 - INFO NIOServerCxnFactory@197 - Accepted socket connection from 
/10.49.21.151:34432
16:32:48,056 - WARN ZookeeperServer@793 - Connection request from old
client /10.49.21.151:34432; will be dropped if server is in r-o mode
16.32.48,056 - INFO ZookeeperServer@832 - Client attempting to renew session 
0x2464fafddc4021f at /10.49.21.151:34432
16:32:48,056 - INFO Learner@107 - Revalidating client: 0x2464fafddc4021f
16:32:48,057 - INFO ZooKeeperServer@588 - Invalid session 0x2464fafddc4021f for 
client /10.49.21.151:34432, probably expired
16:32:48,057 - NIOServerCnxn@1001 - Closed socket connection for client
/10.49.21.151:34432 which had sessionid 0x2464fafddc4021f
...etc until Storm has had enough and restarts the worker resulting in this
16:47:20,706 - NIOServerCnxn@349 - Caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid 
0x3464f20777e01cf, likely client has closed socket
at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
at java.langThread.run(Thread.java:745)

1) Is it appropriate to run Zookeeper in parallel on the same node with the 
storm services?
2) We have zookeeper 3.4.5 installed. I see Storm uses zookeeper-3.3.3 as its 
client. Should we downgrade our installation?


Date: Sat, 31 May 2014 13:50:57 -0500
From: der...@yahoo-inc.com
To: user@storm.incubator.apache.org
Subject: Re: Workers 

java.lang.IllegalArgumentException: timeout value is negative

2014-06-03 Thread P Ghosh
I'm getting this exception
2014-06-03 19:59:13 STDIO [ERROR][id:] Jun 03, 2014 7:59:13 PM
org.jboss.netty.channel.DefaultChannelPipeline
WARNING: An exception was thrown by a user handler while handling an
exception event ([id: 0xdcf3be42] EXCEPTION: java.net.ConnectException:
Connection refused)
java.lang.IllegalArgumentException: timeout value is negative
at java.lang.Thread.sleep(Native Method)
at backtype.storm.messaging.netty.Client.reconnect(Client.java:94)
at
backtype.storm.messaging.netty.StormClientHandler.exceptionCaught(StormClientHandler.java:118)
at
org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:377)


*Here's my storm.yaml (from nimbus/zookeeper only server hence ports
are commented)*
storm.zookeeper.servers:
 - ma-app05.corp.mydomain.com
storm.zookeeper.port: 2181
storm.zookeeper.root: /storm
storm.zookeeper.session.timeout: 2
storm.zookeeper.connection.timeout: 15000
storm.zookeeper.retry.times: 5
storm.zookeeper.retry.interval: 1000
storm.zookeeper.retry.intervalceiling.millis: 3


nimbus.host: ma-app05.corp.mydomain.com
nimbus.thrift.port: 6627
nimbus.thrift.max_buffer_size: 1048576
nimbus.childopts: -Xmx1024m
nimbus.task.timeout.secs: 30
nimbus.supervisor.timeout.secs: 60
nimbus.monitor.freq.secs: 10
nimbus.cleanup.inbox.freq.secs: 600
nimbus.inbox.jar.expiration.secs: 3600
nimbus.task.launch.secs: 120
nimbus.reassign: true
nimbus.file.copy.expiration.secs: 600
nimbus.topology.validator: backtype.storm.nimbus.DefaultTopologyValidator

#No supervisor in Nimbus. Pls. note that storm might still be loading the
default conf, if it requires.
#supervisor.slots.ports:
#  - 9951
#  - 9952
#  - 9953
#  - 9954
#  - 9955
#supervisor.worker.start.timeout.secs: 120
#supervisor.worker.timeout.secs: 30
#supervisor.monitor.frequency.secs: 3
#supervisor.heartbeat.frequency.secs: 5
#supervisor.enable: true


ui.port: 8181
ui.childopts: -Xmx256m


logviewer.port: 8000
logviewer.childopts: -Xmx128m
logviewer.appender.name: A1

worker.childopts: -Xmx512m -XX:MaxPermSize=256m -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -verbose:gc
-Xloggc:/app/local/var/logs/jvm-stat/gc-storm-worker-%ID%.log
worker.heartbeat.frequency.secs: 1

task.heartbeat.frequency.secs: 3
task.refresh.poll.secs: 10

#zmq.threads: 1
#zmq.linger.millis: 5000
#zmq.hwm: 0

storm.local.dir: /ngs/app/isdbd/local/var/run/storm/data

storm.local.mode.zmq: false


storm.cluster.mode: distributed

storm.thrift.transport: backtype.storm.security.auth.SimpleTransportPlugin

storm.messaging.transport: backtype.storm.messaging.netty.Context
storm.messaging.transport: backtype.storm.messaging.netty.Context
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.max_retries: 100
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 500

java.library.path: /usr/local/lib:/opt/local/lib:/usr/lib

topology.enable.message.timeouts: true
topology.debug: false
topology.optimize: true
topology.workers: 5
topology.acker.executors: null
topology.tasks: null
topology.message.timeout.secs: 30
topology.skip.missing.kryo.registrations: false
topology.max.task.parallelism: null
topology.max.spout.pending: 20
topology.state.synchronization.timeout.secs: 60
topology.stats.sample.rate: 0.05
topology.builtin.metrics.bucket.size.secs: 60
topology.fall.back.on.java.serialization: true
topology.worker.childopts: -Xmx512m
topology.executor.receive.buffer.size: 1024

topology.executor.send.buffer.size: 1024

topology.receiver.buffer.size: 8

topology.transfer.buffer.size: 1024

topology.tick.tuple.freq.secs: null
topology.worker.shared.thread.pool.size: 4
topology.disruptor.wait.strategy: com.lmax.disruptor.BlockingWaitStrategy
topology.spout.wait.strategy: backtype.storm.spout.SleepSpoutWaitStrategy
topology.sleep.spout.wait.strategy.time.ms: 1
topology.error.throttle.interval.secs: 10
topology.max.error.report.per.interval: 5
topology.kryo.factory: backtype.storm.serialization.DefaultKryoFactory
topology.tuple.serializer:
backtype.storm.serialization.types.ListDelegateSerializer
topology.trident.batch.emit.interval.millis: 500

#dev.zookeeper.path: /tmp/dev-storm-zookeeper


how does PersistentAggregate distribute the DB Calls ?

2014-06-03 Thread Raphael Hsieh
How does PersistentAggregate distribute the database calls across all the
worked nodes ?
Does it do the global aggregation then choose a single host to do a
multiget/multiput to the external db ?

Thanks
-- 
Raphael Hsieh


Re: how does PersistentAggregate distribute the DB Calls ?

2014-06-03 Thread Raphael Hsieh
Thanks for your quick reply nathan.
So I'm doing some debugging of my topology, and I've removed all the logic
from my MultiPut function, replacing it with a single System.out.println()
Then i am monitoring my logs to check when this gets printed out.
It looks like every single one of my hosts (workers) hits this. Does this
then indicate that I am processing many many partitions that each hit this
multiPut and prints out?
Thanks.


On Tue, Jun 3, 2014 at 3:29 PM, Nathan Marz nat...@nathanmarz.com wrote:

 When possible it will do as much aggregation Storm-side so as to minimize
 amount it needs to interact with database. So if you do a persistent global
 count, for example, it will compute the count for the batch (in parallel),
 and then the task that finishes the global count will do a single
 get/update/put to the database.


 On Tue, Jun 3, 2014 at 3:08 PM, Raphael Hsieh raffihs...@gmail.com
 wrote:

 How does PersistentAggregate distribute the database calls across all the
 worked nodes ?
 Does it do the global aggregation then choose a single host to do a
 multiget/multiput to the external db ?

 Thanks
 --
 Raphael Hsieh







 --
 Twitter: @nathanmarz
 http://nathanmarz.com




-- 
Raphael Hsieh


If I register a metrics object with a bolt/spout, will it run in the same thread as the bolt/spout?

2014-06-03 Thread Xueming Li
Or do I need to worry about synchronization issue? Thanks in advance!


If I register a metrics object with a bolt/spout task, will it run in the same thread as the task?

2014-06-03 Thread Xueming Li
Sorry, changed the title to make it more accurate.


On Tue, Jun 3, 2014 at 11:12 PM, Xueming Li james.xueming...@gmail.com
wrote:

 Or do I need to worry about synchronization issue? Thanks in advance!