Yes -- more details: Storm version: 0.9.1-incubating installed using a variant of your storm-vagrant deployment (https://github.com/ptgoetz/storm-vagrant).
Cluster setup: two supervisor nodes with 1024m, nimbus with 1024m, zookeeper (3.3.5) 512mb node, and a kafka (0.8.0) 512mb node. Persisting to a local cassandra cluster. Here's an example topology I'm running. This topology works both in local and distributed mode. A variant of this topology (more persisting and more complicated functions on the kafka stream) works in local mode but gives the thrift error reported above when submitting. public class SentenceAggregationTopology { private final BrokerHosts brokerHosts; public SentenceAggregationTopology(String kafkaZookeeper) { brokerHosts = new ZkHosts(kafkaZookeeper); } public StormTopology buildTopology() { return buildTopology(null); } public StormTopology buildTopology(LocalDRPC drpc) { TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "storm-sentence", "storm"); kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig); KafkaSentenceMapper mapper = new KafkaSentenceMapper("playlist", "testtable", "word", "count"); TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("kafka", kafkaSpout).shuffle(). each(new Fields("str"), new WordSplit(), new Fields("word")). groupBy(new Fields("word")). persistentAggregate( CassandraBackingMap.nonTransactional(mapper), new Count(), new Fields("aggregates_words")) .parallelismHint(2); topology.newDRPCStream("words", drpc) .each(new Fields("args"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum")); return topology.build(); } public static void main(String[] args) throws Exception { final int TIME_INTERVAL_IN_MILLIS = 1000; String kafkaZk = args[0]; SentenceAggregationTopology sentenceAggregationTopology = new SentenceAggregationTopology(kafkaZk); Config config = new Config(); config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, TIME_INTERVAL_IN_MILLIS); config.put(Configuration.CASSANDRA_CQL_HOSTS_KEY, args[1]); if (args != null && args.length > 2) { String name = args[2]; config.setNumWorkers(4); config.setMaxTaskParallelism(4); StormSubmitter.submitTopology(name, config, sentenceAggregationTopology.buildTopology()); } else { LocalDRPC drpc = new LocalDRPC(); config.setNumWorkers(2); config.setDebug(true); config.setMaxTaskParallelism(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("kafka", config, sentenceAggregationTopology.buildTopology(drpc)); while (true) { System.out.println("Word count: " + drpc.execute("words", "the")); Utils.sleep(TIME_INTERVAL_IN_MILLIS); } } } } On Tue, Mar 11, 2014 at 7:33 PM, P. Taylor Goetz <ptgo...@gmail.com> wrote: > Hi Robert, > > Can you provide additional details, like what storm version you are using, > etc.? > > -Taylor > > > On Mar 11, 2014, at 6:57 PM, Robert Lee <lee.robert...@gmail.com> wrote: > > > > > > > > After submitting my topology via the storm jar command: > > .... > > .... > > .... > > 562 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar > storm-kafka-cassandra-0.1.0-SNAPSHOT-jar-with-dependencies.jar to assigned > location: > storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar > > 2307 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded > topology jar to assigned location: > storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar > > 2307 [main] INFO backtype.storm.StormSubmitter - Submitting topology > test in distributed mode with conf > {"topology.max.task.parallelism":4,"topology.workers":2,"topology.debug":true,"topology.trident.batch.emit.interval.millis":5000,} > > Exception in thread "main" java.lang.RuntimeException: > org.apache.thrift7.transport.TTransportException: java.net.SocketException: > Connection reset > > at > backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:95) > > at > backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:41) > > at > com.sentaware.sentalytics.storm.trident.TwitterTopology.main(TwitterTopology.java:180) > > Caused by: org.apache.thrift7.transport.TTransportException: > java.net.SocketException: Connection reset > > at > org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147) > > at > org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157) > > at org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65) > > at > backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:139) > > at > backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:128) > > at > backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:81) > > ... 2 more > > Caused by: java.net.SocketException: Connection reset > > at > java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118) > > at java.net.SocketOutputStream.write(SocketOutputStream.java:159) > > at > org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145) > > ... 7 more > > > > > > storm@nimbus:~$ tail /var/log/storm/nimbus.log > > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to > storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar > > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to > storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar > > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from > client: > storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar > > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from > client: > storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar > > 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame size > of 2064605, which is bigger than the maximum allowable buffer size for ALL > connections. > > > > Thoughts on how to proceed? I tried boosting memory from 256mb to 1024mb > on the nimbus and supervisor nodes with no luck. The jar file is roughly > 18MB in size and I can run another topology within the jar fine but the one > I want to run (more complex) fails. >