No errors in the log? Was Kafak version do you use?
On 05/26/2016 11:33 AM, Daniela S wrote: > Thank you very much, that solved the problem. > > Now I could deploy my topology, but it seems that it does not consume > any messages from Kafka. > My topology is running and my producer is generating messages and I can > see these messages in the console consumer but my Spout does not emit > any tuples. > > Here are parts of my code: > *Producer *(JSON Generator using > https://github.com/acesinc/json-data-generator): > { > "workflows": [{ > "workflowName": "test", > "workflowFilename": "exampleWorkflow.json" > }], > "producers": [{ > "type": "kafka", > "broker.server": "localhost", > "broker.port": 9092, > "topic": "logevent", > "flatten": false, > "sync": false > },{ > "type":"logger" > }] > > *Workflow: * > { > "eventFrequency": 60000, > "varyEventFrequency": true, > "repeatWorkflow": true, > "timeBetweenRepeat": 60000, > "varyRepeatFrequency": true, > "steps": [{ > "config": [{ > "timestamp": "nowTimestamp()", > "ID1": "integer(1,1000)", > "ID2": "integer(1,100)", > "command": "random('ON','OFF')" > }], > "duration": 0 > }] > } > > *Topology:* > public class Topology { > > public static void main(String[] args) throws > AlreadyAliveException, InvalidTopologyException, AuthorizationException { > > ZkHosts zkHosts = new ZkHosts("localhost:2181"); > SpoutConfig kafkaConfig = new SpoutConfig(zkHosts,"logevent", "", "id7"); > kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > TopologyBuilder builder = new TopologyBuilder(); > builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1); > builder.setBolt("StoreToRedisBolt", new StoreToRedisBolt("127.0.0.1", > 6379), 1).globalGrouping("KafkaSpout"); > LocalCluster cluster = new LocalCluster(); > Config conf = new Config(); > StormSubmitter.submitTopology("Topology", conf, > builder.createTopology()); > try { > System.out.println("Waiting to consume from kafka"); > Thread.sleep(10000); > } catch (Exception exception) { > System.out.println("Thread interrupted exception : " + exception); > } > } > } > *Bolt* > public class StoreToRedisBolt implements IBasicBolt{ > > private static final long serialVersionUID = 2L; > private RedisOperations redisOperations = null; > private String redisIP = null; > private int port; > > public StoreToRedisBolt(String redisIP, int port) { > this.redisIP = redisIP; > this.port = port; > } > > public void execute(Tuple input, BasicOutputCollector collector) { > Map<String, Object> record = new HashMap<String, Object>(); > String tupleString = input.getString(0); > JSONObject obj = new JSONObject(tupleString); > record.put("timestamp", obj.getString("timestamp")); > record.put("ID1", obj.getString("ID1")); > record.put("ID2", obj.getString("ID2")); > record.put("command", obj.getString("command")); > String id = obj.getString("ID1") + obj.getString("ID2"); > redisOperations.insert(record, id); > } > public void declareOutputFields(OutputFieldsDeclarer declarer) { > > } > public Map<String, Object> getComponentConfiguration() { > return null; > } > public void prepare(Map stormConf, TopologyContext context) { > redisOperations = new RedisOperations(this.redisIP, this.port); > } > public void cleanup() { > > } > } > > Thank you so much for your help in advance! > > Regards > > *Gesendet:* Mittwoch, 25. Mai 2016 um 16:45 Uhr > *Von:* "Matthias J. Sax" <[email protected]> > *An:* [email protected] > *Betreff:* Re: Error when deploying a topology > Is com/google/common/cache/Cache contained in your jar file? > > On 05/25/2016 10:35 PM, Daniela S wrote: >> Hi >> >> I am trying to deploy a simple topology, which receives data from Kafka >> and should write it to Redis. But unfortunately I receive the following >> error message: >> >> 13659 [Thread-9] INFO o.a.s.d.worker - Worker >> 741ffc04-f217-41e7-b942-490c1895e7d7 for storm Topology-1-1464208228 on >> 66fa5b8f-87ee-42ee-8436-4fed29a20580:1024 has finished loading >> 13660 [Thread-9] INFO o.a.s.config - SET worker-user >> 741ffc04-f217-41e7-b942-490c1895e7d7 >> 14512 [Thread-21-__system-executor[-1 -1]] INFO o.a.s.d.executor - >> Preparing bolt __system:(-1) >> 14536 [Thread-21-__system-executor[-1 -1]] INFO o.a.s.d.executor - >> Prepared bolt __system:(-1) >> 14553 [Thread-15-StoreToRedisBolt-executor[2 2]] INFO o.a.s.d.executor >> - Preparing bolt StoreToRedisBolt:(2) >> 14588 [Thread-19-KafkaSpout-executor[1 1]] INFO o.a.s.d.executor - >> Opening spout KafkaSpout:(1) >> 14597 [Thread-17-__acker-executor[3 3]] INFO o.a.s.d.executor - >> Preparing bolt __acker:(3) >> 14608 [Thread-17-__acker-executor[3 3]] INFO o.a.s.d.executor - >> Prepared bolt __acker:(3) >> 14635 [Thread-15-StoreToRedisBolt-executor[2 2]] INFO o.a.s.d.executor >> - Prepared bolt StoreToRedisBolt:(2) >> 14709 [Thread-19-KafkaSpout-executor[1 1]] ERROR o.a.s.util - Async loop >> died! >> java.lang.NoSuchMethodError: >> > com.google.common.cache.CacheBuilder.build()Lcom/google/common/cache/Cache; >> at >> > org.apache.curator.framework.imps.NamespaceWatcherMap.<init>(NamespaceWatcherMap.java:33) >> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >> at >> > org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:81) >> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >> at >> > org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:145) >> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >> at >> > org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:100) >> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >> at org.apache.storm.kafka.ZkState.newCurator(ZkState.java:45) >> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >> at org.apache.storm.kafka.ZkState.<init>(ZkState.java:61) >> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >> at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75) >> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >> at >> > org.apache.storm.daemon.executor$fn__8158$fn__8173.invoke(executor.clj:602) >> ~[storm-core-1.0.0.jar:1.0.0] >> at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:482) >> [storm-core-1.0.0.jar:1.0.0] >> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] >> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91] >> 14712 [Thread-19-KafkaSpout-executor[1 1]] ERROR o.a.s.d.executor - >> java.lang.NoSuchMethodError: >> > com.google.common.cache.CacheBuilder.build()Lcom/google/common/cache/Cache; >> at >> > org.apache.curator.framework.imps.NamespaceWatcherMap.<init>(NamespaceWatcherMap.java:33) >> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >> at >> > org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:81) >> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >> at >> > org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:145) >> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >> at >> > org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:100) >> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >> at org.apache.storm.kafka.ZkState.newCurator(ZkState.java:45) >> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >> at org.apache.storm.kafka.ZkState.<init>(ZkState.java:61) >> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >> at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75) >> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] >> at >> > org.apache.storm.daemon.executor$fn__8158$fn__8173.invoke(executor.clj:602) >> ~[storm-core-1.0.0.jar:1.0.0] >> at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:482) >> [storm-core-1.0.0.jar:1.0.0] >> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] >> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91] >> 14774 [Thread-19-KafkaSpout-executor[1 1]] ERROR o.a.s.util - Halting >> process: ("Worker died") >> java.lang.RuntimeException: ("Worker died") >> at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) >> [storm-core-1.0.0.jar:1.0.0] >> at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?] >> at >> org.apache.storm.daemon.worker$fn__8827$fn__8828.invoke(worker.clj:758) >> [storm-core-1.0.0.jar:1.0.0] >> at >> > org.apache.storm.daemon.executor$mk_executor_data$fn__8046$fn__8047.invoke(executor.clj:271) >> [storm-core-1.0.0.jar:1.0.0] >> at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:494) >> [storm-core-1.0.0.jar:1.0.0] >> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] >> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91] >> >> Does anyone know what may be the problem? >> >> Thank you for your help! >> >> Regards, >> Daniela >> >> >
signature.asc
Description: OpenPGP digital signature
