Looks like you have 2 versions of the kafka spout in your classpath. What
output do you get when you run the following command from your project
directory?

mvn dependency:tree| grep kafka
​

On Thu, Jan 22, 2015 at 2:55 PM, Margus Roo <mar...@roo.ee> wrote:

>  I made my code more simple and still the same problem.
>
> code
> public class Topology {
>
>
>
>       public static void main(String[] args) throws Exception {
>            Config conf = new Config();
>            conf.setDebug(true);
>
>
>            TridentTopology topology = new TridentTopology();
>            BrokerHosts zk = new ZkHosts("bigdata14:2181");
>
>            TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
> "demo");
>            spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>            OpaqueTridentKafkaSpout spout = new
> OpaqueTridentKafkaSpout(spoutConf);
>
>            Stream spoutStream = topology.newStream("kafka-stream",
>                    spout);
>            spoutStream.broadcast();
>
>
>                 // if you wish to run your job on a remote cluster
>                 conf.setNumWorkers(4);
>                 conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
>                 conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
>                 conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
>                 conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
>                 conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING  , 512);
>
>                 StormSubmitter.submitTopology(args[0], conf,
> topology.build());
>
>         }
> }
>
> and log:
>
> 2015-01-22T23:52:20.670+0200 o.a.s.z.ClientCnxn [INFO] Session
> establishment complete on server localhost/127.0.0.1:2181, sessionid =
> 0x14b12e178cd034c, negotiated timeout = 20000
> 2015-01-22T23:52:20.671+0200 o.a.s.c.f.s.ConnectionStateManager [INFO]
> State change: CONNECTED
> 2015-01-22T23:52:20.672+0200 b.s.zookeeper [INFO] Zookeeper state update:
> :connected:none
> 2015-01-22T23:52:21.685+0200 o.a.s.z.ZooKeeper [INFO] Session:
> 0x14b12e178cd034c closed
> 2015-01-22T23:52:21.685+0200 o.a.s.z.ClientCnxn [INFO] EventThread shut
> down
> 2015-01-22T23:52:21.687+0200 b.s.u.StormBoundedExponentialBackoffRetry
> [INFO] The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries
> [5]
> 2015-01-22T23:52:21.687+0200 o.a.s.c.f.i.CuratorFrameworkImpl [INFO]
> Starting
> 2015-01-22T23:52:21.687+0200 o.a.s.z.ZooKeeper [INFO] Initiating client
> connection, connectString=localhost:2181/storm sessionTimeout=20000
> watcher=org.apache.storm.curator.ConnectionState@9ca62e2
> 2015-01-22T23:52:21.688+0200 o.a.s.z.ClientCnxn [INFO] Opening socket
> connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to
> authenticate using SASL (unknown error)
> 2015-01-22T23:52:21.688+0200 o.a.s.z.ClientCnxn [INFO] Socket connection
> established to localhost/0:0:0:0:0:0:0:1:2181, initiating session
> 2015-01-22T23:52:21.694+0200 o.a.s.z.ClientCnxn [INFO] Session
> establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid
> = 0x14b12e178cd034e, negotiated timeout = 20000
> 2015-01-22T23:52:21.694+0200 o.a.s.c.f.s.ConnectionStateManager [INFO]
> State change: CONNECTED
> 2015-01-22T23:52:21.718+0200 b.s.d.worker [INFO] Reading Assignments.
> 2015-01-22T23:52:21.746+0200 b.s.m.TransportFactory [INFO] Storm peer
> transport plugin:backtype.storm.messaging.netty.Context
> 2015-01-22T23:52:21.872+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN]
> maxRetries too large (300). Pinning to 29
> 2015-01-22T23:52:21.873+0200 b.s.u.StormBoundedExponentialBackoffRetry
> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries
> [300]
> 2015-01-22T23:52:21.873+0200 b.s.m.n.Client [INFO] New Netty Client,
> connect to bigdata17, 6702, config: , buffer_size: 5242880
> 2015-01-22T23:52:21.879+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN]
> maxRetries too large (300). Pinning to 29
> 2015-01-22T23:52:21.879+0200 b.s.u.StormBoundedExponentialBackoffRetry
> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries
> [300]
> 2015-01-22T23:52:21.879+0200 b.s.m.n.Client [INFO] New Netty Client,
> connect to bigdata17, 6701, config: , buffer_size: 5242880
> 2015-01-22T23:52:21.880+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN]
> maxRetries too large (300). Pinning to 29
> 2015-01-22T23:52:21.880+0200 b.s.u.StormBoundedExponentialBackoffRetry
> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries
> [300]
> 2015-01-22T23:52:21.880+0200 b.s.m.n.Client [INFO] New Netty Client,
> connect to bigdata17, 6700, config: , buffer_size: 5242880
> 2015-01-22T23:52:21.882+0200 b.s.m.n.Client [INFO] Reconnect started for
> Netty-Client-bigdata17/192.168.80.214:6702... [0]
> 2015-01-22T23:52:21.888+0200 b.s.m.n.Client [INFO] Reconnect started for
> Netty-Client-bigdata17/192.168.80.214:6701... [0]
> 2015-01-22T23:52:21.888+0200 b.s.m.n.Client [INFO] Reconnect started for
> Netty-Client-bigdata17/192.168.80.214:6700... [0]
> 2015-01-22T23:52:22.001+0200 b.s.m.n.Client [INFO] Reconnect started for
> Netty-Client-bigdata17/192.168.80.214:6701... [1]
> 2015-01-22T23:52:22.002+0200 b.s.m.n.Client [INFO] Reconnect started for
> Netty-Client-bigdata17/192.168.80.214:6700... [1]
> 2015-01-22T23:52:22.002+0200 b.s.m.n.Client [INFO] Reconnect started for
> Netty-Client-bigdata17/192.168.80.214:6702... [1]
> 2015-01-22T23:52:22.068+0200 b.s.d.executor [INFO] Loading executor
> __acker:[5 5]
> 2015-01-22T23:52:22.073+0200 b.s.d.task [INFO] Emitting: __acker __system
> ["startup"]
> 2015-01-22T23:52:22.074+0200 b.s.d.executor [INFO] Loaded executor tasks
> __acker:[5 5]
> 2015-01-22T23:52:22.080+0200 b.s.d.executor [INFO] Timeouts disabled for
> executor __acker:[5 5]
> 2015-01-22T23:52:22.081+0200 b.s.d.executor [INFO] Finished loading
> executor __acker:[5 5]
> 2015-01-22T23:52:22.087+0200 b.s.d.executor [INFO] Preparing bolt
> __acker:(5)
> 2015-01-22T23:52:22.088+0200 b.s.d.executor [INFO] Loading executor
> spout0:[9 9]
> 2015-01-22T23:52:22.091+0200 b.s.d.executor [INFO] Prepared bolt
> __acker:(5)
> 2015-01-22T23:52:22.104+0200 b.s.m.n.Client [INFO] Reconnect started for
> Netty-Client-bigdata17/192.168.80.214:6700... [2]
> 2015-01-22T23:52:22.104+0200 b.s.d.worker [ERROR] Error on initialization
> of server mk-worker
> *java.lang.RuntimeException: java.io.InvalidClassException:
> storm.kafka.KafkaConfig; local class incompatible: stream classdesc
> serialVersionUID = 1806199026298360819, local class serialVersionUID =
> 862253719916491316*
>         at
> backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:56)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.utils.Utils.deserialize(Utils.java:89)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.utils.Utils.getSetComponentObject(Utils.java:228)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.daemon.task$get_task_object.invoke(task.clj:73)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.task$mk_task_data$fn__3131.invoke(task.clj:180)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.util$assoc_apply_self.invoke(util.clj:850)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.daemon.task$mk_task.invoke(task.clj:184)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.executor$mk_executor$fn__3310.invoke(executor.clj:323)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at clojure.core$map$fn__4207.invoke(core.clj:2485)
> ~[clojure-1.5.1.jar:na]
>         at clojure.lang.LazySeq.sval(LazySeq.java:42)
> ~[clojure-1.5.1.jar:na]
>         at clojure.lang.LazySeq.seq(LazySeq.java:60)
> ~[clojure-1.5.1.jar:na]
>         at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
>         at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
>         at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
> ~[clojure-1.5.1.jar:na]
>         at clojure.core.protocols$fn__6026.invoke(protocols.clj:54)
> ~[clojure-1.5.1.jar:na]
>         at
> clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13)
> ~[clojure-1.5.1.jar:na]
>         at clojure.core$reduce.invoke(core.clj:6177)
> ~[clojure-1.5.1.jar:na]
>         at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na]
>         at
> backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:323)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744$iter__3749__3753$fn__3754.invoke(worker.clj:382)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.LazySeq.sval(LazySeq.java:42)
> ~[clojure-1.5.1.jar:na]
>         at clojure.lang.LazySeq.seq(LazySeq.java:60)
> ~[clojure-1.5.1.jar:na]
>         at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.5.1.jar:na]
>         at clojure.lang.LazySeq.next(LazySeq.java:92)
> ~[clojure-1.5.1.jar:na]
>         at clojure.lang.RT.next(RT.java:598) ~[clojure-1.5.1.jar:na]
>         at clojure.core$next.invoke(core.clj:64) ~[clojure-1.5.1.jar:na]
>         at clojure.core$dorun.invoke(core.clj:2781) ~[clojure-1.5.1.jar:na]
>         at clojure.core$doall.invoke(core.clj:2796) ~[clojure-1.5.1.jar:na]
>         at
> backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744.invoke(worker.clj:382)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.AFn.applyToHelper(AFn.java:185)
> [clojure-1.5.1.jar:na]
>         at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
>         at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]
>         at
> backtype.storm.daemon.worker$fn__3743$mk_worker__3799.doInvoke(worker.clj:354)
> [storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.RestFn.invoke(RestFn.java:512)
> [clojure-1.5.1.jar:na]
>         at backtype.storm.daemon.worker$_main.invoke(worker.clj:461)
> [storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.AFn.applyToHelper(AFn.java:172)
> [clojure-1.5.1.jar:na]
>         at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
>         at backtype.storm.daemon.worker.main(Unknown Source)
> [storm-core-0.9.3.jar:0.9.3]
>
> Margus (margusja) Roohttp://margus.roo.ee
> skype: margusja
> +372 51 480
>
> On 22/01/15 20:10, Margus Roo wrote:
>
> Hi
>
> 2015-01-22T20:06:06.175+0200 b.s.u.StormBoundedExponentialBackoffRetry
> [INFO] The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries
> [5]
> 2015-01-22T20:06:06.175+0200 o.a.s.c.f.i.CuratorFrameworkImpl [INFO]
> Starting
> 2015-01-22T20:06:06.175+0200 o.a.s.z.ZooKeeper [INFO] Initiating client
> connection, connectString=localhost:2181/storm sessionTimeout=20000
> watcher=org.apache.storm.curator.ConnectionState@5c23f9fd
> 2015-01-22T20:06:06.176+0200 o.a.s.z.ClientCnxn [INFO] Opening socket
> connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to
> authenticate using SASL (unknown error)
> 2015-01-22T20:06:06.176+0200 o.a.s.z.ClientCnxn [INFO] Socket connection
> established to localhost/0:0:0:0:0:0:0:1:2181, initiating session
> 2015-01-22T20:06:06.182+0200 o.a.s.z.ClientCnxn [INFO] Session
> establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid
> = 0x14b016362340b5f, negotiated timeout = 20000
> 2015-01-22T20:06:06.182+0200 o.a.s.c.f.s.ConnectionStateManager [INFO]
> State change: CONNECTED
> 2015-01-22T20:06:06.201+0200 b.s.d.worker [INFO] Reading Assignments.
> 2015-01-22T20:06:06.226+0200 b.s.m.TransportFactory [INFO] Storm peer
> transport plugin:backtype.storm.messaging.netty.Context
> 2015-01-22T20:06:06.318+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN]
> maxRetries too large (300). Pinning to 29
> 2015-01-22T20:06:06.318+0200 b.s.u.StormBoundedExponentialBackoffRetry
> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries
> [300]
> 2015-01-22T20:06:06.318+0200 b.s.m.n.Client [INFO] New Netty Client,
> connect to bigdata17, 6703, config: , buffer_size: 5242880
> 2015-01-22T20:06:06.323+0200 b.s.m.n.Client [INFO] Reconnect started for
> Netty-Client-bigdata17/192.168.80.214:6703... [0]
> 2015-01-22T20:06:06.323+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN]
> maxRetries too large (300). Pinning to 29
> 2015-01-22T20:06:06.323+0200 b.s.u.StormBoundedExponentialBackoffRetry
> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries
> [300]
> 2015-01-22T20:06:06.323+0200 b.s.m.n.Client [INFO] New Netty Client,
> connect to bigdata17, 6702, config: , buffer_size: 5242880
> 2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] Reconnect started for
> Netty-Client-bigdata17/192.168.80.214:6702... [0]
> 2015-01-22T20:06:06.324+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN]
> maxRetries too large (300). Pinning to 29
> 2015-01-22T20:06:06.324+0200 b.s.u.StormBoundedExponentialBackoffRetry
> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries
> [300]
> 2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] New Netty Client,
> connect to bigdata17, 6701, config: , buffer_size: 5242880
> 2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] Reconnect started for
> Netty-Client-bigdata17/192.168.80.214:6701... [0]
> 2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection established
> to a remote host Netty-Client-bigdata17/192.168.80.214:6702, [id:
> 0xffc67e3c, /192.168.80.214:55264 => bigdata17/192.168.80.214:6702]
> 2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection established
> to a remote host Netty-Client-bigdata17/192.168.80.214:6701, [id:
> 0x54e2a4e9, /192.168.80.214:55748 => bigdata17/192.168.80.214:6701]
> 2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection established
> to a remote host Netty-Client-bigdata17/192.168.80.214:6703, [id:
> 0xc78c04d0, /192.168.80.214:42319 => bigdata17/192.168.80.214:6703]
> 2015-01-22T20:06:06.503+0200 b.s.d.executor [INFO] Loading executor
> spout:[5 5]
> 2015-01-22T20:06:06.517+0200 b.s.d.worker [ERROR] Error on initialization
> of server mk-worker
> java.lang.RuntimeException: java.io.InvalidClassException:
> storm.kafka.KafkaConfig; local class incompatible: stream classdesc
> serialVersionUID = 1806199026298360819, local class serialVersionUID =
> 862253719916491316
>         at
> backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:56)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.utils.Utils.deserialize(Utils.java:89)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.utils.Utils.getSetComponentObject(Utils.java:228)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.daemon.task$get_task_object.invoke(task.clj:73)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.task$mk_task_data$fn__3131.invoke(task.clj:180)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.util$assoc_apply_self.invoke(util.clj:850)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173)
> ~[storm-core-0.9.3.jar:0.9.3]
>
> 604,1         91%
>         at backtype.storm.util$assoc_apply_self.invoke(util.clj:850)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.daemon.task$mk_task.invoke(task.clj:184)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.executor$mk_executor$fn__3310.invoke(executor.clj:323)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at clojure.core$map$fn__4207.invoke(core.clj:2485)
> ~[clojure-1.5.1.jar:na]
>         at clojure.lang.LazySeq.sval(LazySeq.java:42)
> ~[clojure-1.5.1.jar:na]
>         at clojure.lang.LazySeq.seq(LazySeq.java:60)
> ~[clojure-1.5.1.jar:na]
>         at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
>         at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
>         at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
> ~[clojure-1.5.1.jar:na]
>         at clojure.core.protocols$fn__6026.invoke(protocols.clj:54)
> ~[clojure-1.5.1.jar:na]
>         at
> clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13)
> ~[clojure-1.5.1.jar:na]
>         at clojure.core$reduce.invoke(core.clj:6177)
> ~[clojure-1.5.1.jar:na]
>         at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na]
>         at
> backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:323)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744$iter__3749__3753$fn__3754.invoke(worker.clj:382)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.LazySeq.sval(LazySeq.java:42)
> ~[clojure-1.5.1.jar:na]
>         at clojure.lang.LazySeq.seq(LazySeq.java:60)
> ~[clojure-1.5.1.jar:na]
>         at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
>         at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
>         at clojure.core$dorun.invoke(core.clj:2780) ~[clojure-1.5.1.jar:na]
>         at clojure.core$doall.invoke(core.clj:2796) ~[clojure-1.5.1.jar:na]
>         at
> backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744.invoke(worker.clj:382)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.AFn.applyToHelper(AFn.java:185)
> [clojure-1.5.1.jar:na]
>         at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
>         at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]
>         at
> backtype.storm.daemon.worker$fn__3743$mk_worker__3799.doInvoke(worker.clj:354)
> [storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.RestFn.invoke(RestFn.java:512)
> [clojure-1.5.1.jar:na]
>         at backtype.storm.daemon.worker$_main.invoke(worker.clj:461)
> [storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.AFn.applyToHelper(AFn.java:172)
> [clojure-1.5.1.jar:na]
>         at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
>         at backtype.storm.daemon.worker.main(Unknown Source)
> [storm-core-0.9.3.jar:0.9.3]
> *Caused by: java.io.InvalidClassException: storm.kafka.KafkaConfig; local
> class incompatible: stream classdesc serialVersionUID =
> 1806199026298360819, local class serialVersionUID = 862253719916491316*
>
>
> I am using storm numbus and supervisor 0.9.3
>
> build topology using:
>     <dependency>
>         <groupId>org.apache.storm</groupId>
>         <artifactId>storm-kafka</artifactId>
>         <version>0.9.3</version>
>     </dependency>
>     <dependency>
>         <groupId>org.apache.storm</groupId>
>         <artifactId>storm-core</artifactId>
>         <version>0.9.3</version>
>         <scope>provided</scope>
>     </dependency>
>         <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka_2.10</artifactId>
>             <version>0.8.2-beta</version>
>         </dependency>
>
> there is no bolt only one spout:
>
> public class Topology {
>       public static void main(String[] args) throws Exception {
>             Config conf = new Config();
>             conf.setDebug(false);
>
>             TopologyBuilder builder = new TopologyBuilder();
>
>             Broker brokerForPartition0 = new Broker("bigdata14");
>             GlobalPartitionInformation partitionInfo = new
> GlobalPartitionInformation();
>             partitionInfo.addPartition(0, brokerForPartition0);
>             StaticHosts hosts = new StaticHosts(partitionInfo);
>
>             SpoutConfig spoutConfig = new SpoutConfig(hosts,
>                     "demo",
>                     "/",  // zookeeper root path for offset storing
>                     "stormdemo");
>                     KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
>             builder.setSpout("spout", kafkaSpout);
>
>             if (args.length > 0) {
>                 // if you wish to run your job on a remote cluster
>                 conf.setNumWorkers(4);
>                 conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
>                 conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
>                 conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
>                 conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
>                 conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING  , 2);
>
>                 StormSubmitter.submitTopology(args[0], conf,
> builder.createTopology());
>             } else {
>                 // if you wish to run and test your job locally
>                 conf.setMaxTaskParallelism(1);
>
>                 LocalCluster cluster = new LocalCluster();
>                 cluster.submitTopology("kafkatopology", conf,
> builder.createTopology());
>                 Thread.sleep(10000);
>                 cluster.shutdown();
>             }
>         }
> }
>
>
> Any ideas?
>
> --
> Margus (margusja) Roohttp://margus.roo.ee
> skype: margusja
> +372 51 480
>
>
>

Reply via email to