Yes -- more details:

Storm version: 0.9.1-incubating installed using a variant of your
storm-vagrant deployment (https://github.com/ptgoetz/storm-vagrant).

Cluster setup: two supervisor nodes with 1024m, nimbus with 1024m,
zookeeper (3.3.5) 512mb node, and a kafka (0.8.0) 512mb node. Persisting to
a local cassandra cluster.

Here's an example topology I'm running. This topology works both in local
and distributed mode. A variant of this topology (more persisting and more
complicated functions on the kafka stream) works in local mode but gives
the thrift error reported above when submitting.

public class SentenceAggregationTopology {

    private final BrokerHosts brokerHosts;

    public SentenceAggregationTopology(String kafkaZookeeper) {
        brokerHosts = new ZkHosts(kafkaZookeeper);
    }

    public StormTopology buildTopology() {
        return buildTopology(null);
    }

    public StormTopology buildTopology(LocalDRPC drpc) {
        TridentKafkaConfig kafkaConfig = new
TridentKafkaConfig(brokerHosts, "storm-sentence", "storm");
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        TransactionalTridentKafkaSpout kafkaSpout = new
TransactionalTridentKafkaSpout(kafkaConfig);
        KafkaSentenceMapper mapper = new KafkaSentenceMapper("playlist",
"testtable", "word", "count");
        TridentTopology topology = new TridentTopology();

        TridentState wordCounts = topology.newStream("kafka",
kafkaSpout).shuffle().
                each(new Fields("str"), new WordSplit(), new
Fields("word")).
                groupBy(new Fields("word")).
                persistentAggregate(
CassandraBackingMap.nonTransactional(mapper),
                        new Count(), new Fields("aggregates_words"))
                .parallelismHint(2);


        topology.newDRPCStream("words", drpc)
                .each(new Fields("args"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .stateQuery(wordCounts, new Fields("word"), new MapGet(),
new Fields("count"))
                .each(new Fields("count"), new FilterNull())
                .aggregate(new Fields("count"), new Sum(), new
Fields("sum"));

        return topology.build();
    }

    public static void main(String[] args) throws Exception {
        final int TIME_INTERVAL_IN_MILLIS = 1000;

        String kafkaZk = args[0];
        SentenceAggregationTopology sentenceAggregationTopology = new
SentenceAggregationTopology(kafkaZk);

        Config config = new Config();
        config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
TIME_INTERVAL_IN_MILLIS);
        config.put(Configuration.CASSANDRA_CQL_HOSTS_KEY, args[1]);

        if (args != null && args.length > 2) {
            String name = args[2];
            config.setNumWorkers(4);
            config.setMaxTaskParallelism(4);
            StormSubmitter.submitTopology(name, config,
sentenceAggregationTopology.buildTopology());
        } else {
            LocalDRPC drpc = new LocalDRPC();
            config.setNumWorkers(2);
            config.setDebug(true);
            config.setMaxTaskParallelism(2);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("kafka", config,
sentenceAggregationTopology.buildTopology(drpc));
            while (true) {
                System.out.println("Word count: " + drpc.execute("words",
"the"));
                Utils.sleep(TIME_INTERVAL_IN_MILLIS);
            }

        }
    }
}


On Tue, Mar 11, 2014 at 7:33 PM, P. Taylor Goetz <ptgo...@gmail.com> wrote:

> Hi Robert,
>
> Can you provide additional details, like what storm version you are using,
> etc.?
>
> -Taylor
>
> > On Mar 11, 2014, at 6:57 PM, Robert Lee <lee.robert...@gmail.com> wrote:
> >
> >
> >
> > After submitting my topology via the storm jar command:
> > ....
> > ....
> > ....
> > 562  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar
> storm-kafka-cassandra-0.1.0-SNAPSHOT-jar-with-dependencies.jar to assigned
> location:
> storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar
> > 2307 [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded
> topology jar to assigned location:
> storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar
> > 2307 [main] INFO  backtype.storm.StormSubmitter - Submitting topology
> test in distributed mode with conf
> {"topology.max.task.parallelism":4,"topology.workers":2,"topology.debug":true,"topology.trident.batch.emit.interval.millis":5000,}
> > Exception in thread "main" java.lang.RuntimeException:
> org.apache.thrift7.transport.TTransportException: java.net.SocketException:
> Connection reset
> >     at
> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:95)
> >     at
> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:41)
> >     at
> com.sentaware.sentalytics.storm.trident.TwitterTopology.main(TwitterTopology.java:180)
> > Caused by: org.apache.thrift7.transport.TTransportException:
> java.net.SocketException: Connection reset
> >     at
> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147)
> >     at
> org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157)
> >     at org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65)
> >     at
> backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:139)
> >     at
> backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:128)
> >     at
> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:81)
> >     ... 2 more
> > Caused by: java.net.SocketException: Connection reset
> >     at
> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118)
> >     at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
> >     at
> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145)
> >     ... 7 more
> >
> >
> > storm@nimbus:~$ tail /var/log/storm/nimbus.log
> > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to
> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
> > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to
> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
> > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from
> client:
> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
> > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from
> client:
> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
> > 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame size
> of 2064605, which is bigger than the maximum allowable buffer size for ALL
> connections.
> >
> > Thoughts on how to proceed? I tried boosting memory from 256mb to 1024mb
> on the nimbus and supervisor nodes with no luck. The jar file is roughly
> 18MB in size and I can run another topology within the jar fine but the one
> I want to run (more complex) fails.
>

Reply via email to