No errors in the log?

Was Kafak version do you use?

On 05/26/2016 11:33 AM, Daniela S wrote:
> Thank you very much, that solved the problem.
>  
> Now I could deploy my topology, but it seems that it does not consume
> any messages from Kafka.
> My topology is running and my producer is generating messages and I can
> see these messages in the console consumer but my Spout does not emit
> any tuples.
>  
> Here are parts of my code:
> *Producer *(JSON Generator using
> https://github.com/acesinc/json-data-generator):
> {
>     "workflows": [{
>             "workflowName": "test",
>             "workflowFilename": "exampleWorkflow.json"
>         }],
> "producers": [{
>             "type": "kafka",
>             "broker.server": "localhost",
>             "broker.port": 9092,
>             "topic": "logevent",
>             "flatten": false,
>             "sync": false
>       },{
>       "type":"logger"
>    }]
>  
> *Workflow: *
> {
>     "eventFrequency": 60000,
>     "varyEventFrequency": true,
>     "repeatWorkflow": true,
>     "timeBetweenRepeat": 60000,
>     "varyRepeatFrequency": true,
>     "steps": [{
>             "config": [{
>                     "timestamp": "nowTimestamp()",
>                     "ID1": "integer(1,1000)",
>                     "ID2": "integer(1,100)",
>                     "command": "random('ON','OFF')"
>                 }],
>             "duration": 0
>         }]
> }
>  
> *Topology:*
> public class Topology {
> 
>   public static void main(String[] args) throws
>   AlreadyAliveException, InvalidTopologyException, AuthorizationException {
>  
>    ZkHosts zkHosts = new ZkHosts("localhost:2181");
>    SpoutConfig kafkaConfig = new SpoutConfig(zkHosts,"logevent", "", "id7");
>    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>    TopologyBuilder builder = new TopologyBuilder();
>    builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
>    builder.setBolt("StoreToRedisBolt", new StoreToRedisBolt("127.0.0.1",
> 6379), 1).globalGrouping("KafkaSpout");
>    LocalCluster cluster = new LocalCluster();
>    Config conf = new Config();
>    StormSubmitter.submitTopology("Topology", conf,
> builder.createTopology());
>     try {
>         System.out.println("Waiting to consume from kafka");
>         Thread.sleep(10000);
>     } catch (Exception exception) {
>         System.out.println("Thread interrupted exception : " + exception);
>     }
> }
>     }
> *Bolt*
> public class StoreToRedisBolt implements IBasicBolt{
>     
>     private static final long serialVersionUID = 2L;
>     private RedisOperations redisOperations = null;
>     private String redisIP = null;
>     private int port;
>     
>     public StoreToRedisBolt(String redisIP, int port) {
>         this.redisIP = redisIP;
>         this.port = port;
>     }
>     
>     public void execute(Tuple input, BasicOutputCollector collector) {
>         Map<String, Object> record = new HashMap<String, Object>();
>         String tupleString = input.getString(0);
>         JSONObject obj = new JSONObject(tupleString);
>         record.put("timestamp", obj.getString("timestamp"));
>         record.put("ID1", obj.getString("ID1"));
>         record.put("ID2", obj.getString("ID2"));
>         record.put("command", obj.getString("command"));
>         String id = obj.getString("ID1") + obj.getString("ID2");
>         redisOperations.insert(record, id);
>     }
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         
>     }
>     public Map<String, Object> getComponentConfiguration() {
>         return null;
>     }
>     public void prepare(Map stormConf, TopologyContext context) {
>         redisOperations = new RedisOperations(this.redisIP, this.port);
>     }
>     public void cleanup() {
>         
>     }
> }
>  
> Thank you so much for your help in advance!
>  
> Regards
>  
> *Gesendet:* Mittwoch, 25. Mai 2016 um 16:45 Uhr
> *Von:* "Matthias J. Sax" <[email protected]>
> *An:* [email protected]
> *Betreff:* Re: Error when deploying a topology
> Is com/google/common/cache/Cache contained in your jar file?
> 
> On 05/25/2016 10:35 PM, Daniela S wrote:
>> Hi
>>
>> I am trying to deploy a simple topology, which receives data from Kafka
>> and should write it to Redis. But unfortunately I receive the following
>> error message:
>>
>> 13659 [Thread-9] INFO o.a.s.d.worker - Worker
>> 741ffc04-f217-41e7-b942-490c1895e7d7 for storm Topology-1-1464208228 on
>> 66fa5b8f-87ee-42ee-8436-4fed29a20580:1024 has finished loading
>> 13660 [Thread-9] INFO o.a.s.config - SET worker-user
>> 741ffc04-f217-41e7-b942-490c1895e7d7
>> 14512 [Thread-21-__system-executor[-1 -1]] INFO o.a.s.d.executor -
>> Preparing bolt __system:(-1)
>> 14536 [Thread-21-__system-executor[-1 -1]] INFO o.a.s.d.executor -
>> Prepared bolt __system:(-1)
>> 14553 [Thread-15-StoreToRedisBolt-executor[2 2]] INFO o.a.s.d.executor
>> - Preparing bolt StoreToRedisBolt:(2)
>> 14588 [Thread-19-KafkaSpout-executor[1 1]] INFO o.a.s.d.executor -
>> Opening spout KafkaSpout:(1)
>> 14597 [Thread-17-__acker-executor[3 3]] INFO o.a.s.d.executor -
>> Preparing bolt __acker:(3)
>> 14608 [Thread-17-__acker-executor[3 3]] INFO o.a.s.d.executor -
>> Prepared bolt __acker:(3)
>> 14635 [Thread-15-StoreToRedisBolt-executor[2 2]] INFO o.a.s.d.executor
>> - Prepared bolt StoreToRedisBolt:(2)
>> 14709 [Thread-19-KafkaSpout-executor[1 1]] ERROR o.a.s.util - Async loop
>> died!
>> java.lang.NoSuchMethodError:
>>
> com.google.common.cache.CacheBuilder.build()Lcom/google/common/cache/Cache;
>> at
>>
> org.apache.curator.framework.imps.NamespaceWatcherMap.<init>(NamespaceWatcherMap.java:33)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:81)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:145)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:100)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.ZkState.newCurator(ZkState.java:45)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.ZkState.<init>(ZkState.java:61)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.storm.daemon.executor$fn__8158$fn__8173.invoke(executor.clj:602)
>> ~[storm-core-1.0.0.jar:1.0.0]
>> at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:482)
>> [storm-core-1.0.0.jar:1.0.0]
>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
>> 14712 [Thread-19-KafkaSpout-executor[1 1]] ERROR o.a.s.d.executor -
>> java.lang.NoSuchMethodError:
>>
> com.google.common.cache.CacheBuilder.build()Lcom/google/common/cache/Cache;
>> at
>>
> org.apache.curator.framework.imps.NamespaceWatcherMap.<init>(NamespaceWatcherMap.java:33)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:81)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:145)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:100)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.ZkState.newCurator(ZkState.java:45)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.ZkState.<init>(ZkState.java:61)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.storm.daemon.executor$fn__8158$fn__8173.invoke(executor.clj:602)
>> ~[storm-core-1.0.0.jar:1.0.0]
>> at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:482)
>> [storm-core-1.0.0.jar:1.0.0]
>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
>> 14774 [Thread-19-KafkaSpout-executor[1 1]] ERROR o.a.s.util - Halting
>> process: ("Worker died")
>> java.lang.RuntimeException: ("Worker died")
>> at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
>> [storm-core-1.0.0.jar:1.0.0]
>> at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>> at
>> org.apache.storm.daemon.worker$fn__8827$fn__8828.invoke(worker.clj:758)
>> [storm-core-1.0.0.jar:1.0.0]
>> at
>>
> org.apache.storm.daemon.executor$mk_executor_data$fn__8046$fn__8047.invoke(executor.clj:271)
>> [storm-core-1.0.0.jar:1.0.0]
>> at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:494)
>> [storm-core-1.0.0.jar:1.0.0]
>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
>>
>> Does anyone know what may be the problem?
>>
>> Thank you for your help!
>>
>> Regards,
>> Daniela
>>
>>
>  

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to