Thank you very much, that solved the problem.
 
Now I could deploy my topology, but it seems that it does not consume any messages from Kafka.
My topology is running and my producer is generating messages and I can see these messages in the console consumer but my Spout does not emit any tuples.
 
Here are parts of my code:
Producer (JSON Generator using https://github.com/acesinc/json-data-generator):
{
    "workflows": [{
            "workflowName": "test",
            "workflowFilename": "exampleWorkflow.json"
        }],
"producers": [{
            "type": "kafka",
            "broker.server": "localhost",
            "broker.port": 9092,
            "topic": "logevent",
            "flatten": false,
            "sync": false
      },{
      "type":"logger"
   }]
 
Workflow:
{
    "eventFrequency": 60000,
    "varyEventFrequency": true,
    "repeatWorkflow": true,
    "timeBetweenRepeat": 60000,
    "varyRepeatFrequency": true,
    "steps": [{
            "config": [{
                    "timestamp": "nowTimestamp()",
                    "ID1": "integer(1,1000)",
                    "ID2": "integer(1,100)",
                    "command": "random('ON','OFF')"
                }],
            "duration": 0
        }]
}
 
Topology:
public class Topology {

  public static void main(String[] args) throws
  AlreadyAliveException, InvalidTopologyException, AuthorizationException {
 
   ZkHosts zkHosts = new ZkHosts("localhost:2181");
   SpoutConfig kafkaConfig = new SpoutConfig(zkHosts,"logevent", "", "id7");
   kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
   TopologyBuilder builder = new TopologyBuilder();
   builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
   builder.setBolt("StoreToRedisBolt", new StoreToRedisBolt("127.0.0.1", 6379), 1).globalGrouping("KafkaSpout");
   LocalCluster cluster = new LocalCluster();
   Config conf = new Config();
   StormSubmitter.submitTopology("Topology", conf, builder.createTopology());
    try {
        System.out.println("Waiting to consume from kafka");
        Thread.sleep(10000);
    } catch (Exception exception) {
        System.out.println("Thread interrupted exception : " + exception);
    }
}
    }
Bolt
public class StoreToRedisBolt implements IBasicBolt{
    
    private static final long serialVersionUID = 2L;
    private RedisOperations redisOperations = null;
    private String redisIP = null;
    private int port;
    
    public StoreToRedisBolt(String redisIP, int port) {
        this.redisIP = redisIP;
        this.port = port;
    }
    
    public void execute(Tuple input, BasicOutputCollector collector) {
        Map<String, Object> record = new HashMap<String, Object>();
        String tupleString = input.getString(0);
        JSONObject obj = new JSONObject(tupleString);
        record.put("timestamp", obj.getString("timestamp"));
        record.put("ID1", obj.getString("ID1"));
        record.put("ID2", obj.getString("ID2"));
        record.put("command", obj.getString("command"));
        String id = obj.getString("ID1") + obj.getString("ID2");
        redisOperations.insert(record, id);
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        
    }
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
    public void prepare(Map stormConf, TopologyContext context) {
        redisOperations = new RedisOperations(this.redisIP, this.port);
    }
    public void cleanup() {
        
    }
}
 
Thank you so much for your help in advance!
 
Regards
 
Gesendet: Mittwoch, 25. Mai 2016 um 16:45 Uhr
Von: "Matthias J. Sax" <[email protected]>
An: [email protected]
Betreff: Re: Error when deploying a topology
Is com/google/common/cache/Cache contained in your jar file?

On 05/25/2016 10:35 PM, Daniela S wrote:
> Hi
>
> I am trying to deploy a simple topology, which receives data from Kafka
> and should write it to Redis. But unfortunately I receive the following
> error message:
>
> 13659 [Thread-9] INFO o.a.s.d.worker - Worker
> 741ffc04-f217-41e7-b942-490c1895e7d7 for storm Topology-1-1464208228 on
> 66fa5b8f-87ee-42ee-8436-4fed29a20580:1024 has finished loading
> 13660 [Thread-9] INFO o.a.s.config - SET worker-user
> 741ffc04-f217-41e7-b942-490c1895e7d7
> 14512 [Thread-21-__system-executor[-1 -1]] INFO o.a.s.d.executor -
> Preparing bolt __system:(-1)
> 14536 [Thread-21-__system-executor[-1 -1]] INFO o.a.s.d.executor -
> Prepared bolt __system:(-1)
> 14553 [Thread-15-StoreToRedisBolt-executor[2 2]] INFO o.a.s.d.executor
> - Preparing bolt StoreToRedisBolt:(2)
> 14588 [Thread-19-KafkaSpout-executor[1 1]] INFO o.a.s.d.executor -
> Opening spout KafkaSpout:(1)
> 14597 [Thread-17-__acker-executor[3 3]] INFO o.a.s.d.executor -
> Preparing bolt __acker:(3)
> 14608 [Thread-17-__acker-executor[3 3]] INFO o.a.s.d.executor -
> Prepared bolt __acker:(3)
> 14635 [Thread-15-StoreToRedisBolt-executor[2 2]] INFO o.a.s.d.executor
> - Prepared bolt StoreToRedisBolt:(2)
> 14709 [Thread-19-KafkaSpout-executor[1 1]] ERROR o.a.s.util - Async loop
> died!
> java.lang.NoSuchMethodError:
> com.google.common.cache.CacheBuilder.build()Lcom/google/common/cache/Cache;
> at
> org.apache.curator.framework.imps.NamespaceWatcherMap.<init>(NamespaceWatcherMap.java:33)
> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:81)
> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:145)
> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:100)
> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> at org.apache.storm.kafka.ZkState.newCurator(ZkState.java:45)
> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> at org.apache.storm.kafka.ZkState.<init>(ZkState.java:61)
> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75)
> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.storm.daemon.executor$fn__8158$fn__8173.invoke(executor.clj:602)
> ~[storm-core-1.0.0.jar:1.0.0]
> at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:482)
> [storm-core-1.0.0.jar:1.0.0]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> 14712 [Thread-19-KafkaSpout-executor[1 1]] ERROR o.a.s.d.executor -
> java.lang.NoSuchMethodError:
> com.google.common.cache.CacheBuilder.build()Lcom/google/common/cache/Cache;
> at
> org.apache.curator.framework.imps.NamespaceWatcherMap.<init>(NamespaceWatcherMap.java:33)
> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:81)
> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:145)
> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:100)
> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> at org.apache.storm.kafka.ZkState.newCurator(ZkState.java:45)
> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> at org.apache.storm.kafka.ZkState.<init>(ZkState.java:61)
> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75)
> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.storm.daemon.executor$fn__8158$fn__8173.invoke(executor.clj:602)
> ~[storm-core-1.0.0.jar:1.0.0]
> at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:482)
> [storm-core-1.0.0.jar:1.0.0]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> 14774 [Thread-19-KafkaSpout-executor[1 1]] ERROR o.a.s.util - Halting
> process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
> at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
> [storm-core-1.0.0.jar:1.0.0]
> at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
> at
> org.apache.storm.daemon.worker$fn__8827$fn__8828.invoke(worker.clj:758)
> [storm-core-1.0.0.jar:1.0.0]
> at
> org.apache.storm.daemon.executor$mk_executor_data$fn__8046$fn__8047.invoke(executor.clj:271)
> [storm-core-1.0.0.jar:1.0.0]
> at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:494)
> [storm-core-1.0.0.jar:1.0.0]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
>
> Does anyone know what may be the problem?
>
> Thank you for your help!
>
> Regards,
> Daniela
>
>
 

Reply via email to