Fwd: Unable to connect to storm drpc server

2014-03-11 Thread Rohan Kapadia
I have a storm-cluster, with everything running on the same instance
though.

I am trying to make a call from Node to the drpc server but get a
ECONNRESETerror in the js script.

On the server I get the message o.a.t.s.TNonblockingServer [ERROR] Read an
invalid frame size of -2147418111. Are you using TFramedTransport on the
client side?

*The complete error message on the client side*

making call
connection error
read ECONNRESET
Error: read ECONNRESET
at errnoException (net.js:904:11)
at TCP.onread (net.js:558:19)

*Client script*

var thrift = require('thrift');
var stormdrpc = require('./gen-nodejs/stormdrpc');
var ttypes = require('./gen-nodejs/test_types');
var assert = require('assert');

transport = thrift.TBufferedTransport();
protocol = thrift.TBinaryProtocol();

var connection = thrift.createConnection(10.0.1.11, 3772, {
  transport : transport,
  protocol : protocol
});

connection.on('error', function(err) {
  console.log('connection error');
  console.log(err.message);
  console.log(err.stack);
});

var client = thrift.createClient(stormdrpc, connection);

console.log(making call);
client.testdrpc('testing', function(err, response) {
  console.log(callback fired);
  console.log(err);
  console.log(response);
  console.log(callback completed);
});

*storm.yaml*

### base
storm.local.dir: /opt/storm
storm.local.mode.zmq: false
storm.cluster.mode: distributed

### zookeeper.*
storm.zookeeper.servers:
- 10.0.2.15
storm.zookeeper.port: 2181
storm.zookeeper.root: /storm
storm.zookeeper.session.timeout: 2
storm.zookeeper.retry.times: 5
storm.zookeeper.retry.interval: 1000

### supervisor.* configs are for node supervisors
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
supervisor.childopts: -Xmx1024m
supervisor.worker.start.timeout.secs: 120
supervisor.worker.timeout.secs: 30
supervisor.monitor.frequency.secs: 3
supervisor.heartbeat.frequency.secs: 5
supervisor.enable: true

### worker.* configs are for task workers
worker.childopts: -Xmx1280m -XX:+UseConcMarkSweepGC
-Dcom.sun.management.jmxremote
worker.heartbeat.frequency.secs: 1
task.heartbeat.frequency.secs: 3
task.refresh.poll.secs: 10
zmq.threads: 1
zmq.linger.millis: 5000

### nimbus.* configs are for the master
nimbus.host: 10.0.2.15
nimbus.thrift.port: 6627
nimbus.childopts: -Xmx1024m
nimbus.task.timeout.secs: 30
nimbus.supervisor.timeout.secs: 60
nimbus.monitor.freq.secs: 10
nimbus.cleanup.inbox.freq.secs: 600
nimbus.inbox.jar.expiration.secs: 3600
nimbus.task.launch.secs: 120
nimbus.reassign: true
nimbus.file.copy.expiration.secs: 600

### ui.* configs are for the master
ui.port: 8080
ui.childopts: -Xmx768m

### drpc.* configs
drpc.port: 3772
drpc.worker.threads: 64
drpc.queue.size: 128
drpc.invocations.port: 3773
drpc.request.timeout.secs: 600
drpc.childopts: -Xmx768m
drpc.servers:
- 10.0.2.15

### transactional.* configs
transactional.zookeeper.servers:
- 10.0.2.15
transactional.zookeeper.root: /storm-transactional
transactional.zookeeper.port: 2181

### topology.* configs are for specific executing storms
topology.debug: true
topology.optimize: true
topology.workers: 1
topology.acker.executors: 1
topology.acker.tasks: null
topology.tasks: null
topology.message.timeout.secs: 30
topology.skip.missing.kryo.registrations: false
topology.max.task.parallelism: null
topology.max.spout.pending: null
topology.state.synchronization.timeout.secs: 60
topology.stats.sample.rate: 0.05
topology.fall.back.on.java.serialization: true
topology.worker.childopts: null




-- 
Rohan Kapadia


Clock Spout

2014-03-11 Thread Klausen Schaefersinho
Hi,

is there something like a clock spout that just periodically emits a tick
event to trigger other bolts?


Cheers,

klaus


Topology execution with 1 GB file on windows

2014-03-11 Thread padma priya chitturi
Hi,

I am executing a topology on Windows which  reads lines of input from 1GB
file (which has nearly 10 million records).

The topology has 1 spout and 1 bolt. After the spout has emittted some  6-7
million records, i see that the worker on which spout runs is terminated
and supervisor launches new worker that would run the spout. And  now the
spout starts reading the file from the beginning of the file.

This happens again and again. Is there any reason behind the worker being
killed and re-initiating the spout. I don't see any exception in the
worker/supervisor logs.

Can someone help me on this.


回复: Error when trying to use multilang in a project built from scratch (not storm-starter)

2014-03-11 Thread Kang Xiao
Hi Chris  

The error message “Caused by: java.io.IOException: Cannot run program python”” 
showed that python is not in PATH env so it can not be invoked. You may try to 
specify the full path of python interpreter.  

--  
Best Regards!

肖康(Kang Xiao,kxiao.ti...@gmail.com (mailto:kxiao.ti...@gmail.com))
Distributed Software Engineer


在 2014年3月10日 星期一,10:47,Chris James 写道:

 Hey there.  I'm trying out a storm project built from scratch in Java, but 
 with a Python bolt.  I have everything running with all Java spouts/bolts 
 just fine, but when I try to incorporate a python bolt I am running into 
 issues.
  
 I have my project separated into a /storm/ for topologies, /storm/bolts/ for 
 bolts, /storm/spouts for spouts, and /storm/multilang/ for the multilang 
 wrappers. Right now the only thing in /storm/multilang/ is storm.py, copied 
 and pasted from the storm-starter project.  In my bolts folder, I have a 
 dummy bolt set up that just prints the tuple.  I've virtually mimicked the 
 storm-starter WordCountTopology example for using a python bolt, so I think 
 the code is OK and the configuration is the issue.  
  
 So my question is simple.  What configuration steps do I have to set up so 
 that my topology knows where to look to find storm.py when I run 
 super(python, dummypythonbolt.py)?  I noticed an error in the stack trace 
 claiming that it could not run python (python is definitely on my path and I 
 use it everyday), and that is looking in a resources folder that does not 
 exist.  Here is the line in question:  
  
 Caused by: java.io.IOException: Cannot run program python (in directory 
 C:\Users\chris\AppData\Local\Temp\67daff0e-7348-46ee-9b62-83f8ee4e431c\supervisor\stormdist\dummy-topology-1-1394418571\resources):
  CreateProcess error=267, The directory name is invalid
  
 A more extensive stack trace is here: http://pastebin.com/6yx97m0M
  
 So once again: what is the configuration step that I am missing to allow my 
 topology to see storm.py and be able to run multilang spouts/bolts in my 
 topology?  
  
 Thanks!  



回复: Unable to connect to storm drpc server

2014-03-11 Thread Kang Xiao
Hi Rohan  

As the error message indicated, you should use thrift TFramedTransport instead 
of TBufferedTransport in the client script since storm drpc server use 
TFramedTransport.  

--  
Best Regards!

肖康(Kang Xiao,kxiao.ti...@gmail.com (mailto:kxiao.ti...@gmail.com))
Distributed Software Engineer


在 2014年3月11日 星期二,16:54,Rohan Kapadia 写道:

  
 I have a storm-cluster, with everything running on the same instance though.  
  
  
 I am trying to make a call from Node to the drpc server but get a ECONNRESET 
 error in the js script.  
  
  
 On the server I get the message o.a.t.s.TNonblockingServer [ERROR] Read an 
 invalid frame size of -2147418111. Are you using TFramedTransport on the 
 client side?
  
  
 The complete error message on the client side
  
 making call connection error read ECONNRESET Error: read ECONNRESET at 
 errnoException (net.js:904:11) at TCP.onread (net.js:558:19)  
  
 Client script
  
 var thrift = require('thrift'); var stormdrpc = 
 require('./gen-nodejs/stormdrpc'); var ttypes = 
 require('./gen-nodejs/test_types'); var assert = require('assert'); transport 
 = thrift.TBufferedTransport(); protocol = thrift.TBinaryProtocol(); var 
 connection = thrift.createConnection(10.0.1.11, 3772, { transport : 
 transport, protocol : protocol }); connection.on('error', function(err) { 
 console.log('connection error'); console.log(err.message); 
 console.log(err.stack); }); var client = thrift.createClient(stormdrpc, 
 connection); console.log(making call); client.testdrpc('testing', 
 function(err, response) { console.log(callback fired); console.log(err); 
 console.log(response); console.log(callback completed); });  
  
 storm.yaml
  
 ### base storm.local.dir: /opt/storm storm.local.mode.zmq: false 
 storm.cluster.mode: distributed ### zookeeper.* storm.zookeeper.servers: - 
 10.0.2.15 storm.zookeeper.port: 2181 storm.zookeeper.root: /storm 
 storm.zookeeper.session.timeout: 2 storm.zookeeper.retry.times: 5 
 storm.zookeeper.retry.interval: 1000 ### supervisor.* configs are for node 
 supervisors supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 
 supervisor.childopts: -Xmx1024m supervisor.worker.start.timeout.secs: 120 
 supervisor.worker.timeout.secs: 30 supervisor.monitor.frequency.secs: 3 
 supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true ### worker.* 
 configs are for task workers worker.childopts: -Xmx1280m 
 -XX:+UseConcMarkSweepGC -Dcom.sun.management.jmxremote 
 worker.heartbeat.frequency.secs: 1 task.heartbeat.frequency.secs: 3 
 task.refresh.poll.secs: 10 zmq.threads: 1 zmq.linger.millis: 5000 ### 
 nimbus.* configs are for the master nimbus.host: 10.0.2.15 
 nimbus.thrift.port: 6627 nimbus.childopts: -Xmx1024m 
 nimbus.task.timeout.secs: 30 nimbus.supervisor.timeout.secs: 60 
 nimbus.monitor.freq.secs: 10 nimbus.cleanup.inbox.freq.secs: 600 
 nimbus.inbox.jar.expiration.secs: 3600 nimbus.task.launch.secs: 120 
 nimbus.reassign: true nimbus.file.copy.expiration.secs: 600 ### ui.* configs 
 are for the master ui.port: 8080 ui.childopts: -Xmx768m ### drpc.* configs 
 drpc.port: 3772 drpc.worker.threads: 64 drpc.queue.size: 128 
 drpc.invocations.port: 3773 drpc.request.timeout.secs: 600 drpc.childopts: 
 -Xmx768m drpc.servers: - 10.0.2.15 ### transactional.* configs 
 transactional.zookeeper.servers: - 10.0.2.15 transactional.zookeeper.root: 
 /storm-transactional transactional.zookeeper.port: 2181 ### topology.* 
 configs are for specific executing storms topology.debug: true 
 topology.optimize: true topology.workers: 1 topology.acker.executors: 1 
 topology.acker.tasks: null topology.tasks: null 
 topology.message.timeout.secs: 30 topology.skip.missing.kryo.registrations: 
 false topology.max.task.parallelism: null topology.max.spout.pending: null 
 topology.state.synchronization.timeout.secs: 60 topology.stats.sample.rate: 
 0.05 topology.fall.back.on.java.serialization: true 
 topology.worker.childopts: null  
  
  
  
  
  
  
 --  
 Rohan Kapadia
  



Re: Error when trying to use multilang in a project built from scratch (not storm-starter)

2014-03-11 Thread Derek Dagit

Would you check your supervisor.log for a message like:

Could not extract  dir  from  jarpath

--
Derek

On 3/10/14, 17:26, Chris James wrote:

Derek: No I cannot cd into that directory, but I can cd into the directory one 
up from it (dummy-topology-1-1394418571).  That directory contains 
stormcode.ser and stormconf.ser files.  The topology is running locally for 
testing, so I'm not launching any separate supervisor daemon.  It just seems 
like it never even attempted to create the resources directory (but it 
successfully created all the ancestor directories), as the folder isn't really 
locked down at all.

P. Taylor: I get what you're implying, but Eclipse is being run as an 
administrator already, and I am debugging the topology locally straight out of 
eclipse.  It seems bizarre that there would be permissions issues on a folder 
that the project itself created.


On Mon, Mar 10, 2014 at 6:16 PM, Derek Dagit der...@yahoo-inc.com 
mailto:der...@yahoo-inc.com wrote:

Two quick thoughts:

- Can you cd to 
'C:\Users\chris\AppData\Local\__Temp\67daff0e-7348-46ee-9b62-__83f8ee4e431c\supervisor\__stormdist\dummy-topology-1-__1394418571\resources'
 from the shell as yourself?

- What are the permissions on that directory?  Is the supervisor daemon 
running as another user?

--
Derek


On 3/10/14, 17:05, P. Taylor Goetz wrote:

I don't have access to a windows machine at the moment, but does this 
help?

http://support.microsoft.com/__kb/832434 
http://support.microsoft.com/kb/832434

On Mar 10, 2014, at 4:51 PM, Chris James chris.james.cont...@gmail.com 
mailto:chris.james.cont...@gmail.com mailto:chris.james.contact@__gmail.com 
mailto:chris.james.cont...@gmail.com wrote:

Reposting since I posted this before at a poor time and got no 
response.

I'm trying out a storm project built from scratch in Java, but with 
a Python bolt.  I have everything running with all Java spouts/bolts just fine, 
but when I try to incorporate a python bolt I am running into issues.

I have my project separated into a /storm/ for topologies, 
/storm/bolts/ for bolts, /storm/spouts for spouts, and /storm/multilang/ for 
the multilang wrappers. Right now the only thing in /storm/multilang/ is 
storm.py, copied and pasted from the storm-starter project.  In my bolts 
folder, I have a dummy bolt set up that just prints the tuple.  I've virtually 
mimicked the storm-starter WordCountTopology example for using a python bolt, 
so I think the code is OK and the configuration is the issue.

So my question is simple.  What configuration steps do I have to set up so that my 
topology knows where to look to find storm.py when I run super(python, 
dummypythonbolt.py)?  I noticed an error in the stack trace claiming that it could not 
run python (python is definitely on my path and I use it everyday), and that is looking in a 
resources folder that does not exist.  Here is the line in question:

Caused by: java.io.IOException: Cannot run program python (in directory 
C:\Users\chris\AppData\Local\__Temp\67daff0e-7348-46ee-9b62-__83f8ee4e431c\supervisor\__stormdist\dummy-topology-1-__1394418571\resources):
 CreateProcess error=267, The directory name is invalid

A more extensive stack trace is here: http://pastebin.com/6yx97m0M

So once again: what is the configuration step that I am missing to 
allow my topology to see storm.py and be able to run multilang spouts/bolts in 
my topology?

Thanks!




Re: 回复: Unable to connect to storm drpc server

2014-03-11 Thread Rohan Kapadia
Hi Kang,

I tried using |TframedTransport| but that also resulted in the same error?

I’m not sure what transport storm uses exactly, but from the error
message what you suggested should make sense.

Also, I am using the |TBinaryProtocol|. Is that correct, or should I use
another protocol?

I have also tried making a connection without giving a transport or
protocol, hoping that the default will work. No luck there also.

Thanks for helping out.

Regards
Rohan



On Tuesday 11 March 2014 09:07:06 PM IST, Kang Xiao wrote:

 Hi Rohan

 As the error message indicated, you should use thrift TFramedTransport
 instead of TBufferedTransport in the client script since storm drpc
 server use TFramedTransport.

 -- 
 Best Regards!

 肖康(Kang Xiao,kxiao.ti...@gmail.com mailto:kxiao.ti...@gmail.com)
 Distributed Software Engineer

 在 2014年3月11日 星期二,16:54,Rohan Kapadia 写道:


 I have a storm-cluster, with everything running on the same instance
 though.

 I am trying to make a call from Node to the drpc server but get a
 |ECONNRESET| error in the js script.

 On the server I get the message |o.a.t.s.TNonblockingServer [ERROR]
 Read an invalid frame size of -2147418111. Are you using
 TFramedTransport on the client side?|

 *The complete error message on the client side*

 |making call
 connection error
 read ECONNRESET
 Error: read ECONNRESET
 at errnoException (net.js:904:11)
 at TCP.onread (net.js:558:19)
 |

 *Client script*

 |var thrift = require('thrift');
 var stormdrpc = require('./gen-nodejs/stormdrpc');
 var ttypes = require('./gen-nodejs/test_types');
 var assert = require('assert');

 transport = thrift.TBufferedTransport();
 protocol = thrift.TBinaryProtocol();

 var connection = thrift.createConnection(10.0.1.11, 3772, {
 transport : transport,
 protocol : protocol
 });

 connection.on('error', function(err) {
 console.log('connection error');
 console.log(err.message);
 console.log(err.stack);
 });

 var client = thrift.createClient(stormdrpc, connection);

 console.log(making call);
 client.testdrpc('testing', function(err, response) {
 console.log(callback fired);
 console.log(err);
 console.log(response);
 console.log(callback completed);
 });
 |

 *storm.yaml*

 |### base
 storm.local.dir: /opt/storm
 storm.local.mode.zmq: false
 storm.cluster.mode: distributed

 ### zookeeper.*
 storm.zookeeper.servers:
 - 10.0.2.15
 storm.zookeeper.port: 2181
 storm.zookeeper.root: /storm
 storm.zookeeper.session.timeout: 2
 storm.zookeeper.retry.times: 5
 storm.zookeeper.retry.interval: 1000

 ### supervisor.* configs are for node supervisors
 supervisor.slots.ports:
 - 6700
 - 6701
 - 6702
 - 6703
 supervisor.childopts: -Xmx1024m
 supervisor.worker.start.timeout.secs: 120
 supervisor.worker.timeout.secs: 30
 supervisor.monitor.frequency.secs: 3
 supervisor.heartbeat.frequency.secs: 5
 supervisor.enable: true

 ### worker.* configs are for task workers
 worker.childopts: -Xmx1280m -XX:+UseConcMarkSweepGC
 -Dcom.sun.management.jmxremote
 worker.heartbeat.frequency.secs: 1
 task.heartbeat.frequency.secs: 3
 task.refresh.poll.secs: 10
 zmq.threads: 1
 zmq.linger.millis: 5000

 ### nimbus.* configs are for the master
 nimbus.host: 10.0.2.15
 nimbus.thrift.port: 6627
 nimbus.childopts: -Xmx1024m
 nimbus.task.timeout.secs: 30
 nimbus.supervisor.timeout.secs: 60
 nimbus.monitor.freq.secs: 10
 nimbus.cleanup.inbox.freq.secs: 600
 nimbus.inbox.jar.expiration.secs: 3600
 nimbus.task.launch.secs: 120
 nimbus.reassign: true
 nimbus.file.copy.expiration.secs: 600

 ### ui.* configs are for the master
 ui.port: 8080
 ui.childopts: -Xmx768m

 ### drpc.* configs
 drpc.port: 3772
 drpc.worker.threads: 64
 drpc.queue.size: 128
 drpc.invocations.port: 3773
 drpc.request.timeout.secs: 600
 drpc.childopts: -Xmx768m
 drpc.servers:
 - 10.0.2.15

 ### transactional.* configs
 transactional.zookeeper.servers:
 - 10.0.2.15
 transactional.zookeeper.root: /storm-transactional
 transactional.zookeeper.port: 2181

 ### topology.* configs are for specific executing storms
 topology.debug: true
 topology.optimize: true
 topology.workers: 1
 topology.acker.executors: 1
 topology.acker.tasks: null
 topology.tasks: null
 topology.message.timeout.secs: 30
 topology.skip.missing.kryo.registrations: false
 topology.max.task.parallelism: null
 topology.max.spout.pending: null
 topology.state.synchronization.timeout.secs: 60
 topology.stats.sample.rate: 0.05
 topology.fall.back.on.java.serialization: true
 topology.worker.childopts: null
 |



 -- 
 Rohan Kapadia




Run Storm Without Using Maven or Lienengen

2014-03-11 Thread Kreutzer, Edward
Perhaps a naïve question here, but how can I compile and submit a storm or 
trident topology without using Maven or Lienengen?

Ted Kreutzer



Nimbus Fails After Uploading Topology - Reading Too Large of Frame Size

2014-03-11 Thread Robert Lee
After submitting my topology via the storm jar command:



562  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar
storm-kafka-cassandra-0.1.0-SNAPSHOT-jar-with-dependencies.jar to assigned
location:
storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar
2307 [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded
topology jar to assigned location:
storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar
2307 [main] INFO  backtype.storm.StormSubmitter - Submitting topology test
in distributed mode with conf
{topology.max.task.parallelism:4,topology.workers:2,topology.debug:true,topology.trident.batch.emit.interval.millis:5000,}
Exception in thread main java.lang.RuntimeException:
org.apache.thrift7.transport.TTransportException: java.net.SocketException:
Connection reset
at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:95)
at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:41)
at
com.sentaware.sentalytics.storm.trident.TwitterTopology.main(TwitterTopology.java:180)
Caused by: org.apache.thrift7.transport.TTransportException:
java.net.SocketException: Connection reset
at
org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147)
at
org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157)
at org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65)
at
backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:139)
at
backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:128)
at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:81)
... 2 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118)
at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
at
org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145)
... 7 more


storm@nimbus:~$ tail /var/log/storm/nimbus.log
2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to
storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to
storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from
client:
storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from
client:
storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame size of
2064605, which is bigger than the maximum allowable buffer size for ALL
connections.

Thoughts on how to proceed? I tried boosting memory from 256mb to 1024mb on
the nimbus and supervisor nodes with no luck. The jar file is roughly 18MB
in size and I can run another topology within the jar fine but the one I
want to run (more complex) fails.


Re: Nimbus Fails After Uploading Topology - Reading Too Large of Frame Size

2014-03-11 Thread Robert Lee
Yes -- more details:

Storm version: 0.9.1-incubating installed using a variant of your
storm-vagrant deployment (https://github.com/ptgoetz/storm-vagrant).

Cluster setup: two supervisor nodes with 1024m, nimbus with 1024m,
zookeeper (3.3.5) 512mb node, and a kafka (0.8.0) 512mb node. Persisting to
a local cassandra cluster.

Here's an example topology I'm running. This topology works both in local
and distributed mode. A variant of this topology (more persisting and more
complicated functions on the kafka stream) works in local mode but gives
the thrift error reported above when submitting.

public class SentenceAggregationTopology {

private final BrokerHosts brokerHosts;

public SentenceAggregationTopology(String kafkaZookeeper) {
brokerHosts = new ZkHosts(kafkaZookeeper);
}

public StormTopology buildTopology() {
return buildTopology(null);
}

public StormTopology buildTopology(LocalDRPC drpc) {
TridentKafkaConfig kafkaConfig = new
TridentKafkaConfig(brokerHosts, storm-sentence, storm);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TransactionalTridentKafkaSpout kafkaSpout = new
TransactionalTridentKafkaSpout(kafkaConfig);
KafkaSentenceMapper mapper = new KafkaSentenceMapper(playlist,
testtable, word, count);
TridentTopology topology = new TridentTopology();

TridentState wordCounts = topology.newStream(kafka,
kafkaSpout).shuffle().
each(new Fields(str), new WordSplit(), new
Fields(word)).
groupBy(new Fields(word)).
persistentAggregate(
CassandraBackingMap.nonTransactional(mapper),
new Count(), new Fields(aggregates_words))
.parallelismHint(2);


topology.newDRPCStream(words, drpc)
.each(new Fields(args), new Split(), new Fields(word))
.groupBy(new Fields(word))
.stateQuery(wordCounts, new Fields(word), new MapGet(),
new Fields(count))
.each(new Fields(count), new FilterNull())
.aggregate(new Fields(count), new Sum(), new
Fields(sum));

return topology.build();
}

public static void main(String[] args) throws Exception {
final int TIME_INTERVAL_IN_MILLIS = 1000;

String kafkaZk = args[0];
SentenceAggregationTopology sentenceAggregationTopology = new
SentenceAggregationTopology(kafkaZk);

Config config = new Config();
config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
TIME_INTERVAL_IN_MILLIS);
config.put(Configuration.CASSANDRA_CQL_HOSTS_KEY, args[1]);

if (args != null  args.length  2) {
String name = args[2];
config.setNumWorkers(4);
config.setMaxTaskParallelism(4);
StormSubmitter.submitTopology(name, config,
sentenceAggregationTopology.buildTopology());
} else {
LocalDRPC drpc = new LocalDRPC();
config.setNumWorkers(2);
config.setDebug(true);
config.setMaxTaskParallelism(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(kafka, config,
sentenceAggregationTopology.buildTopology(drpc));
while (true) {
System.out.println(Word count:  + drpc.execute(words,
the));
Utils.sleep(TIME_INTERVAL_IN_MILLIS);
}

}
}
}


On Tue, Mar 11, 2014 at 7:33 PM, P. Taylor Goetz ptgo...@gmail.com wrote:

 Hi Robert,

 Can you provide additional details, like what storm version you are using,
 etc.?

 -Taylor

  On Mar 11, 2014, at 6:57 PM, Robert Lee lee.robert...@gmail.com wrote:
 
 
 
  After submitting my topology via the storm jar command:
  
  
  
  562  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar
 storm-kafka-cassandra-0.1.0-SNAPSHOT-jar-with-dependencies.jar to assigned
 location:
 storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar
  2307 [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded
 topology jar to assigned location:
 storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar
  2307 [main] INFO  backtype.storm.StormSubmitter - Submitting topology
 test in distributed mode with conf
 {topology.max.task.parallelism:4,topology.workers:2,topology.debug:true,topology.trident.batch.emit.interval.millis:5000,}
  Exception in thread main java.lang.RuntimeException:
 org.apache.thrift7.transport.TTransportException: java.net.SocketException:
 Connection reset
  at
 backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:95)
  at
 backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:41)
  at
 com.sentaware.sentalytics.storm.trident.TwitterTopology.main(TwitterTopology.java:180)
  Caused by: org.apache.thrift7.transport.TTransportException:
 java.net.SocketException: Connection reset
  at
 

Re: Nimbus Fails After Uploading Topology - Reading Too Large of Frame Size

2014-03-11 Thread P. Taylor Goetz
Hi Robert,

Can you provide additional details, like what storm version you are using, etc.?

-Taylor

 On Mar 11, 2014, at 6:57 PM, Robert Lee lee.robert...@gmail.com wrote:
 
 
 
 After submitting my topology via the storm jar command:
 
 
 
 562  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar 
 storm-kafka-cassandra-0.1.0-SNAPSHOT-jar-with-dependencies.jar to assigned 
 location: 
 storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar
 2307 [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded 
 topology jar to assigned location: 
 storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar
 2307 [main] INFO  backtype.storm.StormSubmitter - Submitting topology test in 
 distributed mode with conf 
 {topology.max.task.parallelism:4,topology.workers:2,topology.debug:true,topology.trident.batch.emit.interval.millis:5000,}
 Exception in thread main java.lang.RuntimeException: 
 org.apache.thrift7.transport.TTransportException: java.net.SocketException: 
 Connection reset
 at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:95)
 at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:41)
 at 
 com.sentaware.sentalytics.storm.trident.TwitterTopology.main(TwitterTopology.java:180)
 Caused by: org.apache.thrift7.transport.TTransportException: 
 java.net.SocketException: Connection reset
 at 
 org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147)
 at 
 org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157)
 at org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65)
 at 
 backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:139)
 at backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:128)
 at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:81)
 ... 2 more
 Caused by: java.net.SocketException: Connection reset
 at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118)
 at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
 at 
 org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145)
 ... 7 more
 
 
 storm@nimbus:~$ tail /var/log/storm/nimbus.log
 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to 
 storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to 
 storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from client: 
 storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from client: 
 storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame size of 
 2064605, which is bigger than the maximum allowable buffer size for ALL 
 connections.
 
 Thoughts on how to proceed? I tried boosting memory from 256mb to 1024mb on 
 the nimbus and supervisor nodes with no luck. The jar file is roughly 18MB in 
 size and I can run another topology within the jar fine but the one I want to 
 run (more complex) fails.