On 3 November 2015 at 16:09, Lakini Senanayaka <lak...@wso2.com> wrote:
> Hi Lasantha, > > This is the class I'm calling and instantiate the CEP object. > > package org.wso2.edgeanalyticsservice1; > > import android.app.Service; > import android.content.Intent; > import android.os.IBinder; > import android.util.Log; > > /** > * This class implements IEdgeAnalyticService.aidl interface > */ > public class EdgeAnalyticsService extends Service { > > private CEP mCep; > private TaskManager taskManager=null; > > public EdgeAnalyticsService() { > mCep = new CEP(); > } > > @Override > public void onCreate() { > } > > /** Returns the IBinder object for the connection */ > @Override > public IBinder onBind(Intent intent) { > taskManager=null; > return mBinder; > } > > @Override > public int onStartCommand(Intent intent, int flags, int startId) { > return Service.START_STICKY; > } > > @Override > public boolean onUnbind (Intent intent){ > return false; > } > > @Override > public void onDestroy() { > stopSelf(); > } > > /** Implementation of methods in the IEdgeAnalyticsService interface */ > public final IEdgeAnalyticsService.Stub mBinder = new > IEdgeAnalyticsService.Stub() { > > IEdgeAnalyticServiceCallback mIEdgeAnalyticServiceCallback=null; > > @Override > public void getServiceNormal(String type, String streamDefinition, > String stream, String query, String > callbackFunctionName,IEdgeAnalyticServiceCallback cb) { > > mIEdgeAnalyticServiceCallback = cb; > > > /** Add the details to CEP if the Client is Type1 */ > if (type.equalsIgnoreCase("TYPE1")) { > mCep.cepAddDetails(getApplicationContext(), streamDefinition, > stream, query, callbackFunctionName, cb); > } > > /** Add the details to CEP if the Client is > Type2-LOCATION_SERVICE */ > else if (type.equalsIgnoreCase("LOCATION_SERVICE")) { > mCep.cepAddDetails(getApplicationContext(), "define stream > LocationStream (latitude double,longitude double); ", "LocationStream", > query, callbackFunctionName,cb); > taskManager = new > TaskManager(getApplicationContext(),"LocationStream",mIEdgeAnalyticServiceCallback,mCep); > taskManager.initializeServiceType(type); > } > > /** Add the details to CEP if the Client is > Type2-HUMIDITY_SERVICE */ > else if (type.equalsIgnoreCase("HUMIDITY_SERVICE")) { > mCep.cepAddDetails(getApplicationContext(), "define stream > HumidityStream (humidity float); ", "HumidityStream", query, > callbackFunctionName, cb); > taskManager = new > TaskManager(getApplicationContext(),"HumidityStream",mIEdgeAnalyticServiceCallback,mCep); > taskManager.initializeServiceType(type); > } > > /** Add the details to CEP if the Client is > Type2-TEMPERATURE_SERVICE */ > else if (type.equalsIgnoreCase("TEMPERATURE_SERVICE")) { > mCep.cepAddDetails(getApplicationContext(), "define stream > temperatureStream (tempValue float); ", "temperatureStream", query, > callbackFunctionName,cb); > taskManager = new > TaskManager(getApplicationContext(),"temperatureStream",mIEdgeAnalyticServiceCallback,mCep); > taskManager.initializeServiceType(type); > } > > /** Add the details to CEP if the Client is > Type2-INTENSITY_SERVICE*/ > else if (type.equalsIgnoreCase("INTENSITY_SERVICE")) { > Log.d("client","INTENSITY SERVICE"); > mCep.cepAddDetails(getApplicationContext(), "define stream > lightIntensityStream (lightValue double); ", "lightIntensityStream", query, > callbackFunctionName, cb); > taskManager = new > TaskManager(getApplicationContext(),"lightIntensityStream",mIEdgeAnalyticServiceCallback,mCep); > taskManager.initializeServiceType(type); > > } > } > > //get servicePatterns > @Override > public void getServicePattern(String type, String streamDefinition, > String stream, String query1,String query2 ,String > callbackFunctionName1,String > callbackFunctionName2,IEdgeAnalyticServiceCallback cb) { > > mIEdgeAnalyticServiceCallback = cb; > > /** Add the details to CEP if the Client is Type1 */ > if (type.equalsIgnoreCase("TYPE1")) { > mCep.cepAddDetails(getApplicationContext(), streamDefinition, > stream, query1, query2, callbackFunctionName1,callbackFunctionName2,cb); > } > > else if (type.equalsIgnoreCase("LOCATION_SERVICE")) { > mCep.cepAddDetails(getApplicationContext(), "define stream > LocationStream (latitude double,longitude double); ", "LocationStream", > query1,query2,callbackFunctionName1,callbackFunctionName2,cb); > taskManager = new > TaskManager(getApplicationContext(),"LocationStream",mIEdgeAnalyticServiceCallback,mCep); > taskManager.initializeServiceType(type); > } > > /** Add the details to CEP if the Client is > Type2-HUMIDITY_SERVICE */ > else if (type.equalsIgnoreCase("HUMIDITY_SERVICE")) { > mCep.cepAddDetails(getApplicationContext(), "define stream > HumidityStream (humidity float); ", > "HumidityStream",query1,query2,callbackFunctionName1,callbackFunctionName2,cb); > taskManager = new > TaskManager(getApplicationContext(),"HumidityStream",mIEdgeAnalyticServiceCallback,mCep); > taskManager.initializeServiceType(type); > } > > /** Add the details to CEP if the Client is > Type2-TEMPERATURE_SERVICE */ > else if (type.equalsIgnoreCase("TEMPERATURE_SERVICE")) { > mCep.cepAddDetails(getApplicationContext(), "define stream > temperatureStream (tempValue float); ", "temperatureStream", > query1,query2,callbackFunctionName1,callbackFunctionName2,cb); > taskManager = new > TaskManager(getApplicationContext(),"temperatureStream",mIEdgeAnalyticServiceCallback,mCep); > taskManager.initializeServiceType(type); > } > > /** Add the details to CEP if the Client is > Type2-INTENSITY_SERVICE*/ > else if (type.equalsIgnoreCase("INTENSITY_SERVICE")) { > Log.d("client","INTENSITY SERVICE"); > mCep.cepAddDetails(getApplicationContext(), "define stream > lightIntensityStream (lightValue double); ", "lightIntensityStream", > query1,query2,callbackFunctionName1,callbackFunctionName2,cb); > taskManager = new > TaskManager(getApplicationContext(),"lightIntensityStream",mIEdgeAnalyticServiceCallback,mCep); > taskManager.initializeServiceType(type); > } > } > > /** Passes values to the streams to do the analytics*/ > @Override > public void sendData(String value, String stream){ > mCep.analyseTheData(value, stream,mIEdgeAnalyticServiceCallback); > } > > @Override > public void stopService() > { > taskManager.stopSensors(); > } > }; > } > > I will try the Singleton pattern and I will update this mail thread. > +1... If you make sure that the Siddhi manager doesn't get instantiated multiple times and the same query isn't added again, it should be fine. > > Thank you. > > > > On Tue, Nov 3, 2015 at 3:37 PM, Lasantha Fernando <lasan...@wso2.com> > wrote: > >> Hi Lakini, >> >> It seems multiple Siddhi managers are getting instantiated and each >> siddhi manager is running without being shut down. Can you point to the >> code where the CEP class is instantiated? >> >> Also, you can try a singleton pattern for the CEP class and do some >> checks when adding the queries or adopt a similar approach to avoid the >> issue of multiple queries running. >> >> Thanks, >> Lasantha >> >> On 3 November 2015 at 15:31, Lakini Senanayaka <lak...@wso2.com> wrote: >> >>> adding Dev@wso2 >>> >>> On Tue, Nov 3, 2015 at 3:25 PM, Lakini Senanayaka <lak...@wso2.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I have implemented a service using Siddhi 2.1.0. >>>> This is my Service class which deals with siddhimanager. >>>> >>>> package org.wso2.edgeanalyticsservice1; >>>> >>>> import android.content.Context; >>>> >>>> import org.wso2.siddhi.core.SiddhiManager; >>>> import org.wso2.siddhi.core.event.Event; >>>> import org.wso2.siddhi.core.stream.input.InputHandler; >>>> import org.wso2.siddhi.core.stream.output.StreamCallback; >>>> import org.wso2.siddhi.core.util.EventPrinter; >>>> >>>> /** >>>> * This class create from the build in sensors and inject >>>> * to the EdgeAnalytics Service for processing on a query passed. >>>> */ >>>> public class CEP { >>>> >>>> private SiddhiManager mSiddhiManager; >>>> private IEdgeAnalyticServiceCallback mCallb = null; >>>> >>>> /** Initialize the SiddhiManager Instance */ >>>> public CEP() { >>>> mSiddhiManager = new SiddhiManager(); >>>> } >>>> >>>> /** Add extra details to the SiddhiManager object */ >>>> public void cepAddDetails(final Context context, String >>>> streamDefinition, String stream, final String query, final String >>>> callbackFunction,IEdgeAnalyticServiceCallback cb) { >>>> mCallb=cb; >>>> >>>> /** Define the stream to the Siddhi Manager */ >>>> mSiddhiManager.defineStream(streamDefinition); >>>> >>>> /** Define the query to the Siddhi Manager */ >>>> mSiddhiManager.addQuery(query); >>>> >>>> /** Define the scallback details to the Siddhi Manager */ >>>> mSiddhiManager.addCallback(callbackFunction, new StreamCallback() { >>>> public void receive(Event[] events) { >>>> EventPrinter.print(events); >>>> try { >>>> mCallb.addCallBack("Passed!! " ); >>>> } catch (Exception e) { >>>> e.printStackTrace(); >>>> } >>>> } >>>> }); >>>> } >>>> >>>> /** Add extra details to the SiddhiManager object */ >>>> public void cepAddDetails(final Context context, String >>>> streamDefinition, String stream, final String query1, final String >>>> query2,final String callbackFunction1,final String >>>> callbackFunction2,IEdgeAnalyticServiceCallback cb) { >>>> //todo:Check the pattern queries >>>> >>>> mCallb=cb; >>>> >>>> /** Define the stream to the Siddhi Manager */ >>>> mSiddhiManager.defineStream(streamDefinition); >>>> >>>> /** Define the query to the Siddhi Manager */ >>>> mSiddhiManager.addQuery(query1); >>>> mSiddhiManager.addQuery(query2); >>>> >>>> /** Define the scallback details to the Siddhi Manager */ >>>> mSiddhiManager.addCallback(callbackFunction1, new StreamCallback() >>>> { >>>> public void receive(Event[] events) { >>>> EventPrinter.print(events); >>>> try { >>>> mCallb.addCallBack("callback1"); >>>> } catch (Exception e) { >>>> e.printStackTrace(); >>>> } >>>> } >>>> }); >>>> >>>> mSiddhiManager.addCallback(callbackFunction2, new StreamCallback() >>>> { >>>> public void receive(Event[] events) { >>>> EventPrinter.print(events); >>>> try { >>>> mCallb.addCallBack("callback2"); >>>> } catch (Exception e) { >>>> e.printStackTrace(); >>>> } >>>> } >>>> }); >>>> } >>>> >>>> /** Analyse single data and send the notification back to client >>>> through callbacks */ >>>> public void analyseTheData(String value, String stream, >>>> IEdgeAnalyticServiceCallback cb) { >>>> >>>> String[] dataCollection=value.split(","); >>>> String[] value_type=null; >>>> Object[] x =new Object[dataCollection.length]; >>>> >>>> /** Identify the data types which the client sends. */ >>>> for(int i=0;i<dataCollection.length ;i++) >>>> { >>>> value_type=dataCollection[i].split("-"); >>>> x[i]=0; >>>> switch (value_type[1]) { >>>> case "double": >>>> x[i]=Double.parseDouble(value_type[0]); >>>> break; >>>> case "float": >>>> x[i]=Float.parseFloat(value_type[0]); >>>> break; >>>> case "int": >>>> x[i]=Integer.parseInt(value_type[0]); >>>> break; >>>> case "string": >>>> x[i]=value_type[0]; >>>> break; >>>> default: >>>> break; >>>> } >>>> } >>>> >>>> InputHandler inputHandler = mSiddhiManager.getInputHandler(stream); >>>> try >>>> { >>>> inputHandler.send(x); >>>> } >>>> catch (InterruptedException e) { >>>> e.printStackTrace(); >>>> } >>>> } >>>> } >>>> >>>> I have set a timer and I'm passing data to the siddhi stream. >>>> When I run this app for the first time it gives only one callback and >>>> working fine.But After I run the same app again and again it gives more >>>> callbacks and it cause for my output in the client side. >>>> >>>> This is my Log file:- >>>> 1-03 15:21:16.276 25481-25777/? I/System.out﹕ >>>> [Event{streamId='LightValueLowHandleCallback2', timeStamp=1446544276262, >>>> data=[151.0], type=new}] >>>> 11-03 15:21:16.276 26231-26293/? D/callbak﹕ callback2 >>>> 11-03 15:21:16.276 26231-26293/? D/client4﹕ gonna off the light >>>> 11-03 15:21:16.286 25481-25777/? I/System.out﹕ >>>> [Event{streamId='LightValueLowHandleCallback2', timeStamp=1446544276262, >>>> data=[151.0], type=new}] >>>> 11-03 15:21:16.286 26231-26243/? D/callbak﹕ callback2 >>>> 11-03 15:21:16.291 26231-26243/? D/client4﹕ gonna off the light >>>> >>>> >>>> So I want to stop repeating callbacks.Could you please show me the >>>> place of the code to correct ,not to get more callbacks. >>>> >>>> Thank you. >>>> >>>> -- >>>> *Intern-Engineering* >>>> Lakini S.Senanayaka >>>> Mobile: +94 712295444 >>>> Email: lak...@wso2.com >>>> >>> >>> >>> >>> -- >>> *Intern-Engineering* >>> Lakini S.Senanayaka >>> Mobile: +94 712295444 >>> Email: lak...@wso2.com >>> >> >> >> >> -- >> *Lasantha Fernando* >> Senior Software Engineer - Data Technologies Team >> WSO2 Inc. http://wso2.com >> >> email: lasan...@wso2.com >> mobile: (+94) 71 5247551 >> > > > > -- > *Intern-Engineering* > Lakini S.Senanayaka > Mobile: +94 712295444 > Email: lak...@wso2.com > -- *Lasantha Fernando* Senior Software Engineer - Data Technologies Team WSO2 Inc. http://wso2.com email: lasan...@wso2.com mobile: (+94) 71 5247551
_______________________________________________ Dev mailing list Dev@wso2.org http://wso2.org/cgi-bin/mailman/listinfo/dev