Hi all, I have already implemented all the code needed to kafka broker. I made the connection between WSO2 and kafka queue.
I send the events to WSO2 as follows: *----------------------------------------------------------------------------------------------------* public class ConsumerTest implements Runnable { private KafkaStream m_stream; private int m_threadNumber; private BrokerListener m_brokerListener; private ObjectMapper mapper = new ObjectMapper(); private Map<String, Object> evento; public ConsumerTest(KafkaStream a_stream, int a_threadNumber, BrokerListener a_brokerListener) { m_threadNumber = a_threadNumber; m_stream = a_stream; m_brokerListener = a_brokerListener; } public void run() { ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()) { try { evento = mapper.readValue(new String(it.next().message()), new TypeReference<Map<String, Object>>() { }); m_brokerListener.onEvent(evento); } catch (IOException ex) { Logger.getLogger(ConsumerTest.class.getName()).log(Level.SEVERE, null, ex); } catch (BrokerEventProcessingException ex) { Logger.getLogger(ConsumerTest.class.getName()).log(Level.SEVERE, null, ex); } } } } *----------------------------------------------------------------------------------------------------* When I write a message on kafka like this: {"Name": "Andrew", "Age": 20}, but WSO2 does nothing, I have configured the broker and bucket like this: *----------------------------------------------------------------------------------------------------* <?xml version="1.0" encoding="UTF-8"?> <cep:bucket name="kafkaBucket" xmlns:cep="http://wso2.org/carbon/cep"> <cep:description> Some Description. </cep:description> <cep:engineProviderConfiguration engineProvider="SiddhiCEPRuntime"> <cep:property name="siddhi.persistence.snapshot.time.interval.minutes">0</cep:property> <cep:property name="siddhi.enable.distributed.processing">false</cep:property> </cep:engineProviderConfiguration> <cep:input brokerName="kafkaBroker" topic="org.wso2.kafka/1.0.0"> <cep:mapMapping queryEventType="Map" stream="stream1"> <cep:property inputName="Name" name="Name" type="java.lang.String"/> <cep:property inputName="Age" name="Age" type="java.lang.Integer"/> </cep:mapMapping> </cep:input> <cep:query name="KPIQuery"> <cep:expression></cep:expression> <cep:output brokerName="emailBroker" topic="andresgome...@gmail.com/ WSO2"> <cep:textMapping>Hi this is a test!! My name is {Name} and I have {Age}.</cep:textMapping> </cep:output> </cep:query> </cep:bucket> *----------------------------------------------------------------------------------------------------* Obviously I don't receive any email. I have also configured the mail as explained: http://docs.wso2.org/display/CEP210/Configuring+E-mail+Broker Can anyone see any fault??? thank you very much!!!! Andres Gomez. Pd: When I start WSO2, this exception happened. I do not know if It will have something to do: *----------------------------------------------------------------------------------------------------* INFO {org.wso2.carbon.registry.core.internal.RegistryCoreServiceComponent} - Registry Mode : READ-WRITE java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:317) at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:219) at org.xerial.snappy.Snappy.<clinit>(Snappy.java:44) at org.apache.cassandra.io.compress.SnappyCompressor.create(SnappyCompressor.java:46) at org.apache.cassandra.io.compress.SnappyCompressor.isAvailable(SnappyCompressor.java:56) at org.apache.cassandra.io.compress.SnappyCompressor.<clinit>(SnappyCompressor.java:38) at org.apache.cassandra.config.CFMetaData.<clinit>(CFMetaData.java:76) at org.apache.cassandra.config.KSMetaData.systemKeyspace(KSMetaData.java:79) at org.apache.cassandra.config.DatabaseDescriptor.loadYaml(DatabaseDescriptor.java:441) at org.apache.cassandra.config.DatabaseDescriptor.<clinit>(DatabaseDescriptor.java:117) at org.apache.cassandra.service.AbstractCassandraDaemon.setup(AbstractCassandraDaemon.java:126) at org.apache.cassandra.service.AbstractCassandraDaemon.activate(AbstractCassandraDaemon.java:353) at org.wso2.carbon.cassandra.server.CassandraServerController$1.run(CassandraServerController.java:48) at java.lang.Thread.run(Thread.java:724) Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1878) at java.lang.Runtime.loadLibrary0(Runtime.java:849) at java.lang.System.loadLibrary(System.java:1087) at org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52) ... 18 more [2013-10-16 17:02:10,194] WARN {org.apache.cassandra.io.compress.SnappyCompressor} - Cannot initialize native Snappy library. Compression on new tables will be disabled. *----------------------------------------------------------------------------------------------------* -- View this message in context: http://wso2-oxygen-tank.10903.n7.nabble.com/Dev-Custom-broker-kafka-all-implemented-Error-when-testing-tp87435.html Sent from the WSO2 Development mailing list archive at Nabble.com. _______________________________________________ Dev mailing list Dev@wso2.org http://wso2.org/cgi-bin/mailman/listinfo/dev