Looks like you have 2 versions of the kafka spout in your classpath. What output do you get when you run the following command from your project directory?
mvn dependency:tree| grep kafka On Thu, Jan 22, 2015 at 2:55 PM, Margus Roo <mar...@roo.ee> wrote: > I made my code more simple and still the same problem. > > code > public class Topology { > > > > public static void main(String[] args) throws Exception { > Config conf = new Config(); > conf.setDebug(true); > > > TridentTopology topology = new TridentTopology(); > BrokerHosts zk = new ZkHosts("bigdata14:2181"); > > TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, > "demo"); > spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); > OpaqueTridentKafkaSpout spout = new > OpaqueTridentKafkaSpout(spoutConf); > > Stream spoutStream = topology.newStream("kafka-stream", > spout); > spoutStream.broadcast(); > > > // if you wish to run your job on a remote cluster > conf.setNumWorkers(4); > conf.put(Config.NIMBUS_THRIFT_PORT, 6627); > conf.put(Config.STORM_ZOOKEEPER_PORT, 2181); > conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m"); > conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m"); > conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING , 512); > > StormSubmitter.submitTopology(args[0], conf, > topology.build()); > > } > } > > and log: > > 2015-01-22T23:52:20.670+0200 o.a.s.z.ClientCnxn [INFO] Session > establishment complete on server localhost/127.0.0.1:2181, sessionid = > 0x14b12e178cd034c, negotiated timeout = 20000 > 2015-01-22T23:52:20.671+0200 o.a.s.c.f.s.ConnectionStateManager [INFO] > State change: CONNECTED > 2015-01-22T23:52:20.672+0200 b.s.zookeeper [INFO] Zookeeper state update: > :connected:none > 2015-01-22T23:52:21.685+0200 o.a.s.z.ZooKeeper [INFO] Session: > 0x14b12e178cd034c closed > 2015-01-22T23:52:21.685+0200 o.a.s.z.ClientCnxn [INFO] EventThread shut > down > 2015-01-22T23:52:21.687+0200 b.s.u.StormBoundedExponentialBackoffRetry > [INFO] The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries > [5] > 2015-01-22T23:52:21.687+0200 o.a.s.c.f.i.CuratorFrameworkImpl [INFO] > Starting > 2015-01-22T23:52:21.687+0200 o.a.s.z.ZooKeeper [INFO] Initiating client > connection, connectString=localhost:2181/storm sessionTimeout=20000 > watcher=org.apache.storm.curator.ConnectionState@9ca62e2 > 2015-01-22T23:52:21.688+0200 o.a.s.z.ClientCnxn [INFO] Opening socket > connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to > authenticate using SASL (unknown error) > 2015-01-22T23:52:21.688+0200 o.a.s.z.ClientCnxn [INFO] Socket connection > established to localhost/0:0:0:0:0:0:0:1:2181, initiating session > 2015-01-22T23:52:21.694+0200 o.a.s.z.ClientCnxn [INFO] Session > establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid > = 0x14b12e178cd034e, negotiated timeout = 20000 > 2015-01-22T23:52:21.694+0200 o.a.s.c.f.s.ConnectionStateManager [INFO] > State change: CONNECTED > 2015-01-22T23:52:21.718+0200 b.s.d.worker [INFO] Reading Assignments. > 2015-01-22T23:52:21.746+0200 b.s.m.TransportFactory [INFO] Storm peer > transport plugin:backtype.storm.messaging.netty.Context > 2015-01-22T23:52:21.872+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] > maxRetries too large (300). Pinning to 29 > 2015-01-22T23:52:21.873+0200 b.s.u.StormBoundedExponentialBackoffRetry > [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries > [300] > 2015-01-22T23:52:21.873+0200 b.s.m.n.Client [INFO] New Netty Client, > connect to bigdata17, 6702, config: , buffer_size: 5242880 > 2015-01-22T23:52:21.879+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] > maxRetries too large (300). Pinning to 29 > 2015-01-22T23:52:21.879+0200 b.s.u.StormBoundedExponentialBackoffRetry > [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries > [300] > 2015-01-22T23:52:21.879+0200 b.s.m.n.Client [INFO] New Netty Client, > connect to bigdata17, 6701, config: , buffer_size: 5242880 > 2015-01-22T23:52:21.880+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] > maxRetries too large (300). Pinning to 29 > 2015-01-22T23:52:21.880+0200 b.s.u.StormBoundedExponentialBackoffRetry > [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries > [300] > 2015-01-22T23:52:21.880+0200 b.s.m.n.Client [INFO] New Netty Client, > connect to bigdata17, 6700, config: , buffer_size: 5242880 > 2015-01-22T23:52:21.882+0200 b.s.m.n.Client [INFO] Reconnect started for > Netty-Client-bigdata17/192.168.80.214:6702... [0] > 2015-01-22T23:52:21.888+0200 b.s.m.n.Client [INFO] Reconnect started for > Netty-Client-bigdata17/192.168.80.214:6701... [0] > 2015-01-22T23:52:21.888+0200 b.s.m.n.Client [INFO] Reconnect started for > Netty-Client-bigdata17/192.168.80.214:6700... [0] > 2015-01-22T23:52:22.001+0200 b.s.m.n.Client [INFO] Reconnect started for > Netty-Client-bigdata17/192.168.80.214:6701... [1] > 2015-01-22T23:52:22.002+0200 b.s.m.n.Client [INFO] Reconnect started for > Netty-Client-bigdata17/192.168.80.214:6700... [1] > 2015-01-22T23:52:22.002+0200 b.s.m.n.Client [INFO] Reconnect started for > Netty-Client-bigdata17/192.168.80.214:6702... [1] > 2015-01-22T23:52:22.068+0200 b.s.d.executor [INFO] Loading executor > __acker:[5 5] > 2015-01-22T23:52:22.073+0200 b.s.d.task [INFO] Emitting: __acker __system > ["startup"] > 2015-01-22T23:52:22.074+0200 b.s.d.executor [INFO] Loaded executor tasks > __acker:[5 5] > 2015-01-22T23:52:22.080+0200 b.s.d.executor [INFO] Timeouts disabled for > executor __acker:[5 5] > 2015-01-22T23:52:22.081+0200 b.s.d.executor [INFO] Finished loading > executor __acker:[5 5] > 2015-01-22T23:52:22.087+0200 b.s.d.executor [INFO] Preparing bolt > __acker:(5) > 2015-01-22T23:52:22.088+0200 b.s.d.executor [INFO] Loading executor > spout0:[9 9] > 2015-01-22T23:52:22.091+0200 b.s.d.executor [INFO] Prepared bolt > __acker:(5) > 2015-01-22T23:52:22.104+0200 b.s.m.n.Client [INFO] Reconnect started for > Netty-Client-bigdata17/192.168.80.214:6700... [2] > 2015-01-22T23:52:22.104+0200 b.s.d.worker [ERROR] Error on initialization > of server mk-worker > *java.lang.RuntimeException: java.io.InvalidClassException: > storm.kafka.KafkaConfig; local class incompatible: stream classdesc > serialVersionUID = 1806199026298360819, local class serialVersionUID = > 862253719916491316* > at > backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:56) > ~[storm-core-0.9.3.jar:0.9.3] > at backtype.storm.utils.Utils.deserialize(Utils.java:89) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.utils.Utils.getSetComponentObject(Utils.java:228) > ~[storm-core-0.9.3.jar:0.9.3] > at backtype.storm.daemon.task$get_task_object.invoke(task.clj:73) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.daemon.task$mk_task_data$fn__3131.invoke(task.clj:180) > ~[storm-core-0.9.3.jar:0.9.3] > at backtype.storm.util$assoc_apply_self.invoke(util.clj:850) > ~[storm-core-0.9.3.jar:0.9.3] > at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173) > ~[storm-core-0.9.3.jar:0.9.3] > at backtype.storm.daemon.task$mk_task.invoke(task.clj:184) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.daemon.executor$mk_executor$fn__3310.invoke(executor.clj:323) > ~[storm-core-0.9.3.jar:0.9.3] > at clojure.core$map$fn__4207.invoke(core.clj:2485) > ~[clojure-1.5.1.jar:na] > at clojure.lang.LazySeq.sval(LazySeq.java:42) > ~[clojure-1.5.1.jar:na] > at clojure.lang.LazySeq.seq(LazySeq.java:60) > ~[clojure-1.5.1.jar:na] > at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na] > at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na] > at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) > ~[clojure-1.5.1.jar:na] > at clojure.core.protocols$fn__6026.invoke(protocols.clj:54) > ~[clojure-1.5.1.jar:na] > at > clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13) > ~[clojure-1.5.1.jar:na] > at clojure.core$reduce.invoke(core.clj:6177) > ~[clojure-1.5.1.jar:na] > at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na] > at > backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:323) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744$iter__3749__3753$fn__3754.invoke(worker.clj:382) > ~[storm-core-0.9.3.jar:0.9.3] > at clojure.lang.LazySeq.sval(LazySeq.java:42) > ~[clojure-1.5.1.jar:na] > at clojure.lang.LazySeq.seq(LazySeq.java:60) > ~[clojure-1.5.1.jar:na] > at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.5.1.jar:na] > at clojure.lang.LazySeq.next(LazySeq.java:92) > ~[clojure-1.5.1.jar:na] > at clojure.lang.RT.next(RT.java:598) ~[clojure-1.5.1.jar:na] > at clojure.core$next.invoke(core.clj:64) ~[clojure-1.5.1.jar:na] > at clojure.core$dorun.invoke(core.clj:2781) ~[clojure-1.5.1.jar:na] > at clojure.core$doall.invoke(core.clj:2796) ~[clojure-1.5.1.jar:na] > at > backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744.invoke(worker.clj:382) > ~[storm-core-0.9.3.jar:0.9.3] > at clojure.lang.AFn.applyToHelper(AFn.java:185) > [clojure-1.5.1.jar:na] > at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] > at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na] > at > backtype.storm.daemon.worker$fn__3743$mk_worker__3799.doInvoke(worker.clj:354) > [storm-core-0.9.3.jar:0.9.3] > at clojure.lang.RestFn.invoke(RestFn.java:512) > [clojure-1.5.1.jar:na] > at backtype.storm.daemon.worker$_main.invoke(worker.clj:461) > [storm-core-0.9.3.jar:0.9.3] > at clojure.lang.AFn.applyToHelper(AFn.java:172) > [clojure-1.5.1.jar:na] > at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] > at backtype.storm.daemon.worker.main(Unknown Source) > [storm-core-0.9.3.jar:0.9.3] > > Margus (margusja) Roohttp://margus.roo.ee > skype: margusja > +372 51 480 > > On 22/01/15 20:10, Margus Roo wrote: > > Hi > > 2015-01-22T20:06:06.175+0200 b.s.u.StormBoundedExponentialBackoffRetry > [INFO] The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries > [5] > 2015-01-22T20:06:06.175+0200 o.a.s.c.f.i.CuratorFrameworkImpl [INFO] > Starting > 2015-01-22T20:06:06.175+0200 o.a.s.z.ZooKeeper [INFO] Initiating client > connection, connectString=localhost:2181/storm sessionTimeout=20000 > watcher=org.apache.storm.curator.ConnectionState@5c23f9fd > 2015-01-22T20:06:06.176+0200 o.a.s.z.ClientCnxn [INFO] Opening socket > connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to > authenticate using SASL (unknown error) > 2015-01-22T20:06:06.176+0200 o.a.s.z.ClientCnxn [INFO] Socket connection > established to localhost/0:0:0:0:0:0:0:1:2181, initiating session > 2015-01-22T20:06:06.182+0200 o.a.s.z.ClientCnxn [INFO] Session > establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid > = 0x14b016362340b5f, negotiated timeout = 20000 > 2015-01-22T20:06:06.182+0200 o.a.s.c.f.s.ConnectionStateManager [INFO] > State change: CONNECTED > 2015-01-22T20:06:06.201+0200 b.s.d.worker [INFO] Reading Assignments. > 2015-01-22T20:06:06.226+0200 b.s.m.TransportFactory [INFO] Storm peer > transport plugin:backtype.storm.messaging.netty.Context > 2015-01-22T20:06:06.318+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] > maxRetries too large (300). Pinning to 29 > 2015-01-22T20:06:06.318+0200 b.s.u.StormBoundedExponentialBackoffRetry > [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries > [300] > 2015-01-22T20:06:06.318+0200 b.s.m.n.Client [INFO] New Netty Client, > connect to bigdata17, 6703, config: , buffer_size: 5242880 > 2015-01-22T20:06:06.323+0200 b.s.m.n.Client [INFO] Reconnect started for > Netty-Client-bigdata17/192.168.80.214:6703... [0] > 2015-01-22T20:06:06.323+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] > maxRetries too large (300). Pinning to 29 > 2015-01-22T20:06:06.323+0200 b.s.u.StormBoundedExponentialBackoffRetry > [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries > [300] > 2015-01-22T20:06:06.323+0200 b.s.m.n.Client [INFO] New Netty Client, > connect to bigdata17, 6702, config: , buffer_size: 5242880 > 2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] Reconnect started for > Netty-Client-bigdata17/192.168.80.214:6702... [0] > 2015-01-22T20:06:06.324+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] > maxRetries too large (300). Pinning to 29 > 2015-01-22T20:06:06.324+0200 b.s.u.StormBoundedExponentialBackoffRetry > [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries > [300] > 2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] New Netty Client, > connect to bigdata17, 6701, config: , buffer_size: 5242880 > 2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] Reconnect started for > Netty-Client-bigdata17/192.168.80.214:6701... [0] > 2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection established > to a remote host Netty-Client-bigdata17/192.168.80.214:6702, [id: > 0xffc67e3c, /192.168.80.214:55264 => bigdata17/192.168.80.214:6702] > 2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection established > to a remote host Netty-Client-bigdata17/192.168.80.214:6701, [id: > 0x54e2a4e9, /192.168.80.214:55748 => bigdata17/192.168.80.214:6701] > 2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection established > to a remote host Netty-Client-bigdata17/192.168.80.214:6703, [id: > 0xc78c04d0, /192.168.80.214:42319 => bigdata17/192.168.80.214:6703] > 2015-01-22T20:06:06.503+0200 b.s.d.executor [INFO] Loading executor > spout:[5 5] > 2015-01-22T20:06:06.517+0200 b.s.d.worker [ERROR] Error on initialization > of server mk-worker > java.lang.RuntimeException: java.io.InvalidClassException: > storm.kafka.KafkaConfig; local class incompatible: stream classdesc > serialVersionUID = 1806199026298360819, local class serialVersionUID = > 862253719916491316 > at > backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:56) > ~[storm-core-0.9.3.jar:0.9.3] > at backtype.storm.utils.Utils.deserialize(Utils.java:89) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.utils.Utils.getSetComponentObject(Utils.java:228) > ~[storm-core-0.9.3.jar:0.9.3] > at backtype.storm.daemon.task$get_task_object.invoke(task.clj:73) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.daemon.task$mk_task_data$fn__3131.invoke(task.clj:180) > ~[storm-core-0.9.3.jar:0.9.3] > at backtype.storm.util$assoc_apply_self.invoke(util.clj:850) > ~[storm-core-0.9.3.jar:0.9.3] > at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173) > ~[storm-core-0.9.3.jar:0.9.3] > > 604,1 91% > at backtype.storm.util$assoc_apply_self.invoke(util.clj:850) > ~[storm-core-0.9.3.jar:0.9.3] > at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173) > ~[storm-core-0.9.3.jar:0.9.3] > at backtype.storm.daemon.task$mk_task.invoke(task.clj:184) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.daemon.executor$mk_executor$fn__3310.invoke(executor.clj:323) > ~[storm-core-0.9.3.jar:0.9.3] > at clojure.core$map$fn__4207.invoke(core.clj:2485) > ~[clojure-1.5.1.jar:na] > at clojure.lang.LazySeq.sval(LazySeq.java:42) > ~[clojure-1.5.1.jar:na] > at clojure.lang.LazySeq.seq(LazySeq.java:60) > ~[clojure-1.5.1.jar:na] > at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na] > at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na] > at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) > ~[clojure-1.5.1.jar:na] > at clojure.core.protocols$fn__6026.invoke(protocols.clj:54) > ~[clojure-1.5.1.jar:na] > at > clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13) > ~[clojure-1.5.1.jar:na] > at clojure.core$reduce.invoke(core.clj:6177) > ~[clojure-1.5.1.jar:na] > at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na] > at > backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:323) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744$iter__3749__3753$fn__3754.invoke(worker.clj:382) > ~[storm-core-0.9.3.jar:0.9.3] > at clojure.lang.LazySeq.sval(LazySeq.java:42) > ~[clojure-1.5.1.jar:na] > at clojure.lang.LazySeq.seq(LazySeq.java:60) > ~[clojure-1.5.1.jar:na] > at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na] > at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na] > at clojure.core$dorun.invoke(core.clj:2780) ~[clojure-1.5.1.jar:na] > at clojure.core$doall.invoke(core.clj:2796) ~[clojure-1.5.1.jar:na] > at > backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744.invoke(worker.clj:382) > ~[storm-core-0.9.3.jar:0.9.3] > at clojure.lang.AFn.applyToHelper(AFn.java:185) > [clojure-1.5.1.jar:na] > at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] > at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na] > at > backtype.storm.daemon.worker$fn__3743$mk_worker__3799.doInvoke(worker.clj:354) > [storm-core-0.9.3.jar:0.9.3] > at clojure.lang.RestFn.invoke(RestFn.java:512) > [clojure-1.5.1.jar:na] > at backtype.storm.daemon.worker$_main.invoke(worker.clj:461) > [storm-core-0.9.3.jar:0.9.3] > at clojure.lang.AFn.applyToHelper(AFn.java:172) > [clojure-1.5.1.jar:na] > at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] > at backtype.storm.daemon.worker.main(Unknown Source) > [storm-core-0.9.3.jar:0.9.3] > *Caused by: java.io.InvalidClassException: storm.kafka.KafkaConfig; local > class incompatible: stream classdesc serialVersionUID = > 1806199026298360819, local class serialVersionUID = 862253719916491316* > > > I am using storm numbus and supervisor 0.9.3 > > build topology using: > <dependency> > <groupId>org.apache.storm</groupId> > <artifactId>storm-kafka</artifactId> > <version>0.9.3</version> > </dependency> > <dependency> > <groupId>org.apache.storm</groupId> > <artifactId>storm-core</artifactId> > <version>0.9.3</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka_2.10</artifactId> > <version>0.8.2-beta</version> > </dependency> > > there is no bolt only one spout: > > public class Topology { > public static void main(String[] args) throws Exception { > Config conf = new Config(); > conf.setDebug(false); > > TopologyBuilder builder = new TopologyBuilder(); > > Broker brokerForPartition0 = new Broker("bigdata14"); > GlobalPartitionInformation partitionInfo = new > GlobalPartitionInformation(); > partitionInfo.addPartition(0, brokerForPartition0); > StaticHosts hosts = new StaticHosts(partitionInfo); > > SpoutConfig spoutConfig = new SpoutConfig(hosts, > "demo", > "/", // zookeeper root path for offset storing > "stormdemo"); > KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); > > builder.setSpout("spout", kafkaSpout); > > if (args.length > 0) { > // if you wish to run your job on a remote cluster > conf.setNumWorkers(4); > conf.put(Config.NIMBUS_THRIFT_PORT, 6627); > conf.put(Config.STORM_ZOOKEEPER_PORT, 2181); > conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m"); > conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m"); > conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING , 2); > > StormSubmitter.submitTopology(args[0], conf, > builder.createTopology()); > } else { > // if you wish to run and test your job locally > conf.setMaxTaskParallelism(1); > > LocalCluster cluster = new LocalCluster(); > cluster.submitTopology("kafkatopology", conf, > builder.createTopology()); > Thread.sleep(10000); > cluster.shutdown(); > } > } > } > > > Any ideas? > > -- > Margus (margusja) Roohttp://margus.roo.ee > skype: margusja > +372 51 480 > > >