Hi all, I have already implemented all the code needed to kafka broker. I
made ​​the connection between WSO2 and kafka queue.

I send the events to WSO2 as follows:

*----------------------------------------------------------------------------------------------------*

public class ConsumerTest implements Runnable {

    private KafkaStream m_stream;
    private int m_threadNumber;
    private BrokerListener m_brokerListener;
    private ObjectMapper mapper = new ObjectMapper();
    private Map<String, Object> evento;

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber,
BrokerListener a_brokerListener) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
        m_brokerListener = a_brokerListener;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext()) {
            try {
                evento = mapper.readValue(new String(it.next().message()),
                        new TypeReference<Map&lt;String, Object>>() {
                        });
                m_brokerListener.onEvent(evento);
            } catch (IOException ex) {
               
Logger.getLogger(ConsumerTest.class.getName()).log(Level.SEVERE, null, ex);
            } catch (BrokerEventProcessingException ex) {
               
Logger.getLogger(ConsumerTest.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

*----------------------------------------------------------------------------------------------------*

When I write a message on kafka like this: {"Name": "Andrew", "Age": 20},
but WSO2 does nothing, I have configured the broker and bucket like this:

*----------------------------------------------------------------------------------------------------*

<?xml version="1.0" encoding="UTF-8"?>
<cep:bucket name="kafkaBucket" xmlns:cep="http://wso2.org/carbon/cep";>
  <cep:description>
Some Description.
    </cep:description>
  <cep:engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
    <cep:property
name="siddhi.persistence.snapshot.time.interval.minutes">0</cep:property>
    <cep:property
name="siddhi.enable.distributed.processing">false</cep:property>
  </cep:engineProviderConfiguration>
  <cep:input brokerName="kafkaBroker" topic="org.wso2.kafka/1.0.0">
    <cep:mapMapping queryEventType="Map" stream="stream1">
      <cep:property inputName="Name" name="Name" type="java.lang.String"/>
      <cep:property inputName="Age" name="Age" type="java.lang.Integer"/>
    </cep:mapMapping>
  </cep:input>
  <cep:query name="KPIQuery">
    <cep:expression></cep:expression>
    <cep:output brokerName="emailBroker" topic="andresgome...@gmail.com/
WSO2">
      <cep:textMapping>Hi this is a test!!

My name is {Name} and I have {Age}.</cep:textMapping>
    </cep:output>
  </cep:query>
</cep:bucket>

*----------------------------------------------------------------------------------------------------*
Obviously I don't receive any email. I have also configured the mail as
explained:

http://docs.wso2.org/display/CEP210/Configuring+E-mail+Broker

Can anyone see any fault??? thank you very much!!!!

Andres Gomez.

Pd: When I start WSO2, this exception happened. I do not know if It will
have something to do:

*----------------------------------------------------------------------------------------------------*

INFO {org.wso2.carbon.registry.core.internal.RegistryCoreServiceComponent} - 
Registry Mode    : READ-WRITE
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:317)
        at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:219)
        at org.xerial.snappy.Snappy.<clinit>(Snappy.java:44)
        at
org.apache.cassandra.io.compress.SnappyCompressor.create(SnappyCompressor.java:46)
        at
org.apache.cassandra.io.compress.SnappyCompressor.isAvailable(SnappyCompressor.java:56)
        at
org.apache.cassandra.io.compress.SnappyCompressor.<clinit>(SnappyCompressor.java:38)
        at org.apache.cassandra.config.CFMetaData.<clinit>(CFMetaData.java:76)
        at
org.apache.cassandra.config.KSMetaData.systemKeyspace(KSMetaData.java:79)
        at
org.apache.cassandra.config.DatabaseDescriptor.loadYaml(DatabaseDescriptor.java:441)
        at
org.apache.cassandra.config.DatabaseDescriptor.<clinit>(DatabaseDescriptor.java:117)
        at
org.apache.cassandra.service.AbstractCassandraDaemon.setup(AbstractCassandraDaemon.java:126)
        at
org.apache.cassandra.service.AbstractCassandraDaemon.activate(AbstractCassandraDaemon.java:353)
        at
org.wso2.carbon.cassandra.server.CassandraServerController$1.run(CassandraServerController.java:48)
        at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in
java.library.path
        at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1878)
        at java.lang.Runtime.loadLibrary0(Runtime.java:849)
        at java.lang.System.loadLibrary(System.java:1087)
        at
org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52)
        ... 18 more
[2013-10-16 17:02:10,194]  WARN
{org.apache.cassandra.io.compress.SnappyCompressor} -  Cannot initialize
native Snappy library. Compression on new tables will be disabled.

*----------------------------------------------------------------------------------------------------*



--
View this message in context: 
http://wso2-oxygen-tank.10903.n7.nabble.com/Dev-Custom-broker-kafka-all-implemented-Error-when-testing-tp87435.html
Sent from the WSO2 Development mailing list archive at Nabble.com.
_______________________________________________
Dev mailing list
Dev@wso2.org
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to