Hi Lasantha, Thank you for the clarifications.I have added same stream again and again to the same siddhi manager instance. I fixed it and now the app is working fine.
Thanks On Tue, Nov 3, 2015 at 5:28 PM, Lasantha Fernando <[email protected]> wrote: > > > On 3 November 2015 at 16:09, Lakini Senanayaka <[email protected]> wrote: > >> Hi Lasantha, >> >> This is the class I'm calling and instantiate the CEP object. >> >> package org.wso2.edgeanalyticsservice1; >> >> import android.app.Service; >> import android.content.Intent; >> import android.os.IBinder; >> import android.util.Log; >> >> /** >> * This class implements IEdgeAnalyticService.aidl interface >> */ >> public class EdgeAnalyticsService extends Service { >> >> private CEP mCep; >> private TaskManager taskManager=null; >> >> public EdgeAnalyticsService() { >> mCep = new CEP(); >> } >> >> @Override >> public void onCreate() { >> } >> >> /** Returns the IBinder object for the connection */ >> @Override >> public IBinder onBind(Intent intent) { >> taskManager=null; >> return mBinder; >> } >> >> @Override >> public int onStartCommand(Intent intent, int flags, int startId) { >> return Service.START_STICKY; >> } >> >> @Override >> public boolean onUnbind (Intent intent){ >> return false; >> } >> >> @Override >> public void onDestroy() { >> stopSelf(); >> } >> >> /** Implementation of methods in the IEdgeAnalyticsService interface */ >> public final IEdgeAnalyticsService.Stub mBinder = new >> IEdgeAnalyticsService.Stub() { >> >> IEdgeAnalyticServiceCallback mIEdgeAnalyticServiceCallback=null; >> >> @Override >> public void getServiceNormal(String type, String streamDefinition, >> String stream, String query, String >> callbackFunctionName,IEdgeAnalyticServiceCallback cb) { >> >> mIEdgeAnalyticServiceCallback = cb; >> >> >> /** Add the details to CEP if the Client is Type1 */ >> if (type.equalsIgnoreCase("TYPE1")) { >> mCep.cepAddDetails(getApplicationContext(), >> streamDefinition, stream, query, callbackFunctionName, cb); >> } >> >> /** Add the details to CEP if the Client is >> Type2-LOCATION_SERVICE */ >> else if (type.equalsIgnoreCase("LOCATION_SERVICE")) { >> mCep.cepAddDetails(getApplicationContext(), "define stream >> LocationStream (latitude double,longitude double); ", "LocationStream", >> query, callbackFunctionName,cb); >> taskManager = new >> TaskManager(getApplicationContext(),"LocationStream",mIEdgeAnalyticServiceCallback,mCep); >> taskManager.initializeServiceType(type); >> } >> >> /** Add the details to CEP if the Client is >> Type2-HUMIDITY_SERVICE */ >> else if (type.equalsIgnoreCase("HUMIDITY_SERVICE")) { >> mCep.cepAddDetails(getApplicationContext(), "define stream >> HumidityStream (humidity float); ", "HumidityStream", query, >> callbackFunctionName, cb); >> taskManager = new >> TaskManager(getApplicationContext(),"HumidityStream",mIEdgeAnalyticServiceCallback,mCep); >> taskManager.initializeServiceType(type); >> } >> >> /** Add the details to CEP if the Client is >> Type2-TEMPERATURE_SERVICE */ >> else if (type.equalsIgnoreCase("TEMPERATURE_SERVICE")) { >> mCep.cepAddDetails(getApplicationContext(), "define stream >> temperatureStream (tempValue float); ", "temperatureStream", query, >> callbackFunctionName,cb); >> taskManager = new >> TaskManager(getApplicationContext(),"temperatureStream",mIEdgeAnalyticServiceCallback,mCep); >> taskManager.initializeServiceType(type); >> } >> >> /** Add the details to CEP if the Client is >> Type2-INTENSITY_SERVICE*/ >> else if (type.equalsIgnoreCase("INTENSITY_SERVICE")) { >> Log.d("client","INTENSITY SERVICE"); >> mCep.cepAddDetails(getApplicationContext(), "define stream >> lightIntensityStream (lightValue double); ", "lightIntensityStream", query, >> callbackFunctionName, cb); >> taskManager = new >> TaskManager(getApplicationContext(),"lightIntensityStream",mIEdgeAnalyticServiceCallback,mCep); >> taskManager.initializeServiceType(type); >> >> } >> } >> >> //get servicePatterns >> @Override >> public void getServicePattern(String type, String streamDefinition, >> String stream, String query1,String query2 ,String >> callbackFunctionName1,String >> callbackFunctionName2,IEdgeAnalyticServiceCallback cb) { >> >> mIEdgeAnalyticServiceCallback = cb; >> >> /** Add the details to CEP if the Client is Type1 */ >> if (type.equalsIgnoreCase("TYPE1")) { >> mCep.cepAddDetails(getApplicationContext(), >> streamDefinition, stream, query1, query2, >> callbackFunctionName1,callbackFunctionName2,cb); >> } >> >> else if (type.equalsIgnoreCase("LOCATION_SERVICE")) { >> mCep.cepAddDetails(getApplicationContext(), "define stream >> LocationStream (latitude double,longitude double); ", "LocationStream", >> query1,query2,callbackFunctionName1,callbackFunctionName2,cb); >> taskManager = new >> TaskManager(getApplicationContext(),"LocationStream",mIEdgeAnalyticServiceCallback,mCep); >> taskManager.initializeServiceType(type); >> } >> >> /** Add the details to CEP if the Client is >> Type2-HUMIDITY_SERVICE */ >> else if (type.equalsIgnoreCase("HUMIDITY_SERVICE")) { >> mCep.cepAddDetails(getApplicationContext(), "define stream >> HumidityStream (humidity float); ", >> "HumidityStream",query1,query2,callbackFunctionName1,callbackFunctionName2,cb); >> taskManager = new >> TaskManager(getApplicationContext(),"HumidityStream",mIEdgeAnalyticServiceCallback,mCep); >> taskManager.initializeServiceType(type); >> } >> >> /** Add the details to CEP if the Client is >> Type2-TEMPERATURE_SERVICE */ >> else if (type.equalsIgnoreCase("TEMPERATURE_SERVICE")) { >> mCep.cepAddDetails(getApplicationContext(), "define stream >> temperatureStream (tempValue float); ", "temperatureStream", >> query1,query2,callbackFunctionName1,callbackFunctionName2,cb); >> taskManager = new >> TaskManager(getApplicationContext(),"temperatureStream",mIEdgeAnalyticServiceCallback,mCep); >> taskManager.initializeServiceType(type); >> } >> >> /** Add the details to CEP if the Client is >> Type2-INTENSITY_SERVICE*/ >> else if (type.equalsIgnoreCase("INTENSITY_SERVICE")) { >> Log.d("client","INTENSITY SERVICE"); >> mCep.cepAddDetails(getApplicationContext(), "define stream >> lightIntensityStream (lightValue double); ", "lightIntensityStream", >> query1,query2,callbackFunctionName1,callbackFunctionName2,cb); >> taskManager = new >> TaskManager(getApplicationContext(),"lightIntensityStream",mIEdgeAnalyticServiceCallback,mCep); >> taskManager.initializeServiceType(type); >> } >> } >> >> /** Passes values to the streams to do the analytics*/ >> @Override >> public void sendData(String value, String stream){ >> mCep.analyseTheData(value, stream,mIEdgeAnalyticServiceCallback); >> } >> >> @Override >> public void stopService() >> { >> taskManager.stopSensors(); >> } >> }; >> } >> >> I will try the Singleton pattern and I will update this mail thread. >> > > +1... If you make sure that the Siddhi manager doesn't get instantiated > multiple times and the same query isn't added again, it should be fine. > > >> >> Thank you. >> >> >> >> On Tue, Nov 3, 2015 at 3:37 PM, Lasantha Fernando <[email protected]> >> wrote: >> >>> Hi Lakini, >>> >>> It seems multiple Siddhi managers are getting instantiated and each >>> siddhi manager is running without being shut down. Can you point to the >>> code where the CEP class is instantiated? >>> >>> Also, you can try a singleton pattern for the CEP class and do some >>> checks when adding the queries or adopt a similar approach to avoid the >>> issue of multiple queries running. >>> >>> Thanks, >>> Lasantha >>> >>> On 3 November 2015 at 15:31, Lakini Senanayaka <[email protected]> wrote: >>> >>>> adding Dev@wso2 >>>> >>>> On Tue, Nov 3, 2015 at 3:25 PM, Lakini Senanayaka <[email protected]> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> I have implemented a service using Siddhi 2.1.0. >>>>> This is my Service class which deals with siddhimanager. >>>>> >>>>> package org.wso2.edgeanalyticsservice1; >>>>> >>>>> import android.content.Context; >>>>> >>>>> import org.wso2.siddhi.core.SiddhiManager; >>>>> import org.wso2.siddhi.core.event.Event; >>>>> import org.wso2.siddhi.core.stream.input.InputHandler; >>>>> import org.wso2.siddhi.core.stream.output.StreamCallback; >>>>> import org.wso2.siddhi.core.util.EventPrinter; >>>>> >>>>> /** >>>>> * This class create from the build in sensors and inject >>>>> * to the EdgeAnalytics Service for processing on a query passed. >>>>> */ >>>>> public class CEP { >>>>> >>>>> private SiddhiManager mSiddhiManager; >>>>> private IEdgeAnalyticServiceCallback mCallb = null; >>>>> >>>>> /** Initialize the SiddhiManager Instance */ >>>>> public CEP() { >>>>> mSiddhiManager = new SiddhiManager(); >>>>> } >>>>> >>>>> /** Add extra details to the SiddhiManager object */ >>>>> public void cepAddDetails(final Context context, String >>>>> streamDefinition, String stream, final String query, final String >>>>> callbackFunction,IEdgeAnalyticServiceCallback cb) { >>>>> mCallb=cb; >>>>> >>>>> /** Define the stream to the Siddhi Manager */ >>>>> mSiddhiManager.defineStream(streamDefinition); >>>>> >>>>> /** Define the query to the Siddhi Manager */ >>>>> mSiddhiManager.addQuery(query); >>>>> >>>>> /** Define the scallback details to the Siddhi Manager */ >>>>> mSiddhiManager.addCallback(callbackFunction, new StreamCallback() >>>>> { >>>>> public void receive(Event[] events) { >>>>> EventPrinter.print(events); >>>>> try { >>>>> mCallb.addCallBack("Passed!! " ); >>>>> } catch (Exception e) { >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> }); >>>>> } >>>>> >>>>> /** Add extra details to the SiddhiManager object */ >>>>> public void cepAddDetails(final Context context, String >>>>> streamDefinition, String stream, final String query1, final String >>>>> query2,final String callbackFunction1,final String >>>>> callbackFunction2,IEdgeAnalyticServiceCallback cb) { >>>>> //todo:Check the pattern queries >>>>> >>>>> mCallb=cb; >>>>> >>>>> /** Define the stream to the Siddhi Manager */ >>>>> mSiddhiManager.defineStream(streamDefinition); >>>>> >>>>> /** Define the query to the Siddhi Manager */ >>>>> mSiddhiManager.addQuery(query1); >>>>> mSiddhiManager.addQuery(query2); >>>>> >>>>> /** Define the scallback details to the Siddhi Manager */ >>>>> mSiddhiManager.addCallback(callbackFunction1, new >>>>> StreamCallback() { >>>>> public void receive(Event[] events) { >>>>> EventPrinter.print(events); >>>>> try { >>>>> mCallb.addCallBack("callback1"); >>>>> } catch (Exception e) { >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> }); >>>>> >>>>> mSiddhiManager.addCallback(callbackFunction2, new >>>>> StreamCallback() { >>>>> public void receive(Event[] events) { >>>>> EventPrinter.print(events); >>>>> try { >>>>> mCallb.addCallBack("callback2"); >>>>> } catch (Exception e) { >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> }); >>>>> } >>>>> >>>>> /** Analyse single data and send the notification back to client >>>>> through callbacks */ >>>>> public void analyseTheData(String value, String stream, >>>>> IEdgeAnalyticServiceCallback cb) { >>>>> >>>>> String[] dataCollection=value.split(","); >>>>> String[] value_type=null; >>>>> Object[] x =new Object[dataCollection.length]; >>>>> >>>>> /** Identify the data types which the client sends. */ >>>>> for(int i=0;i<dataCollection.length ;i++) >>>>> { >>>>> value_type=dataCollection[i].split("-"); >>>>> x[i]=0; >>>>> switch (value_type[1]) { >>>>> case "double": >>>>> x[i]=Double.parseDouble(value_type[0]); >>>>> break; >>>>> case "float": >>>>> x[i]=Float.parseFloat(value_type[0]); >>>>> break; >>>>> case "int": >>>>> x[i]=Integer.parseInt(value_type[0]); >>>>> break; >>>>> case "string": >>>>> x[i]=value_type[0]; >>>>> break; >>>>> default: >>>>> break; >>>>> } >>>>> } >>>>> >>>>> InputHandler inputHandler = >>>>> mSiddhiManager.getInputHandler(stream); >>>>> try >>>>> { >>>>> inputHandler.send(x); >>>>> } >>>>> catch (InterruptedException e) { >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> } >>>>> >>>>> I have set a timer and I'm passing data to the siddhi stream. >>>>> When I run this app for the first time it gives only one callback and >>>>> working fine.But After I run the same app again and again it gives more >>>>> callbacks and it cause for my output in the client side. >>>>> >>>>> This is my Log file:- >>>>> 1-03 15:21:16.276 25481-25777/? I/System.out﹕ >>>>> [Event{streamId='LightValueLowHandleCallback2', timeStamp=1446544276262, >>>>> data=[151.0], type=new}] >>>>> 11-03 15:21:16.276 26231-26293/? D/callbak﹕ callback2 >>>>> 11-03 15:21:16.276 26231-26293/? D/client4﹕ gonna off the light >>>>> 11-03 15:21:16.286 25481-25777/? I/System.out﹕ >>>>> [Event{streamId='LightValueLowHandleCallback2', timeStamp=1446544276262, >>>>> data=[151.0], type=new}] >>>>> 11-03 15:21:16.286 26231-26243/? D/callbak﹕ callback2 >>>>> 11-03 15:21:16.291 26231-26243/? D/client4﹕ gonna off the light >>>>> >>>>> >>>>> So I want to stop repeating callbacks.Could you please show me the >>>>> place of the code to correct ,not to get more callbacks. >>>>> >>>>> Thank you. >>>>> >>>>> -- >>>>> *Intern-Engineering* >>>>> Lakini S.Senanayaka >>>>> Mobile: +94 712295444 >>>>> Email: [email protected] >>>>> >>>> >>>> >>>> >>>> -- >>>> *Intern-Engineering* >>>> Lakini S.Senanayaka >>>> Mobile: +94 712295444 >>>> Email: [email protected] >>>> >>> >>> >>> >>> -- >>> *Lasantha Fernando* >>> Senior Software Engineer - Data Technologies Team >>> WSO2 Inc. http://wso2.com >>> >>> email: [email protected] >>> mobile: (+94) 71 5247551 >>> >> >> >> >> -- >> *Intern-Engineering* >> Lakini S.Senanayaka >> Mobile: +94 712295444 >> Email: [email protected] >> > > > > -- > *Lasantha Fernando* > Senior Software Engineer - Data Technologies Team > WSO2 Inc. http://wso2.com > > email: [email protected] > mobile: (+94) 71 5247551 > -- *Intern-Engineering* Lakini S.Senanayaka Mobile: +94 712295444 Email: [email protected]
_______________________________________________ Dev mailing list [email protected] http://wso2.org/cgi-bin/mailman/listinfo/dev
