Hi Lasantha,

This is the class I'm calling and instantiate the CEP object.

package org.wso2.edgeanalyticsservice1;

import android.app.Service;
import android.content.Intent;
import android.os.IBinder;
import android.util.Log;

/**
 * This class implements IEdgeAnalyticService.aidl interface
 */
public class EdgeAnalyticsService extends Service {

    private CEP mCep;
    private TaskManager taskManager=null;

    public EdgeAnalyticsService() {
        mCep = new CEP();
    }

    @Override
    public void onCreate() {
    }

    /** Returns the IBinder object for the connection */
    @Override
    public IBinder onBind(Intent intent) {
        taskManager=null;
        return mBinder;
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        return Service.START_STICKY;
    }

    @Override
    public boolean onUnbind (Intent intent){
        return false;
    }

    @Override
    public void onDestroy() {
        stopSelf();
    }

    /** Implementation of methods in the IEdgeAnalyticsService interface */
    public final IEdgeAnalyticsService.Stub mBinder = new
IEdgeAnalyticsService.Stub() {

        IEdgeAnalyticServiceCallback mIEdgeAnalyticServiceCallback=null;

        @Override
        public void getServiceNormal(String type, String
streamDefinition, String stream, String query, String
callbackFunctionName,IEdgeAnalyticServiceCallback cb) {

            mIEdgeAnalyticServiceCallback = cb;


            /** Add the details to CEP if the Client is Type1 */
            if (type.equalsIgnoreCase("TYPE1")) {
                mCep.cepAddDetails(getApplicationContext(),
streamDefinition, stream, query, callbackFunctionName, cb);
            }

            /** Add the details to CEP if the Client is
Type2-LOCATION_SERVICE */
            else if (type.equalsIgnoreCase("LOCATION_SERVICE")) {
                mCep.cepAddDetails(getApplicationContext(), "define
stream LocationStream (latitude double,longitude double); ",
"LocationStream", query, callbackFunctionName,cb);
                taskManager = new
TaskManager(getApplicationContext(),"LocationStream",mIEdgeAnalyticServiceCallback,mCep);
                taskManager.initializeServiceType(type);
            }

            /** Add the details to CEP if the Client is
Type2-HUMIDITY_SERVICE */
            else if (type.equalsIgnoreCase("HUMIDITY_SERVICE")) {
                mCep.cepAddDetails(getApplicationContext(), "define
stream HumidityStream (humidity float); ", "HumidityStream", query,
callbackFunctionName, cb);
                taskManager = new
TaskManager(getApplicationContext(),"HumidityStream",mIEdgeAnalyticServiceCallback,mCep);
                taskManager.initializeServiceType(type);
            }

            /** Add the details to CEP if the Client is
Type2-TEMPERATURE_SERVICE */
            else if (type.equalsIgnoreCase("TEMPERATURE_SERVICE")) {
                mCep.cepAddDetails(getApplicationContext(), "define
stream temperatureStream (tempValue float); ", "temperatureStream",
query, callbackFunctionName,cb);
                taskManager = new
TaskManager(getApplicationContext(),"temperatureStream",mIEdgeAnalyticServiceCallback,mCep);
                taskManager.initializeServiceType(type);
            }

            /** Add the details to CEP if the Client is
Type2-INTENSITY_SERVICE*/
            else if (type.equalsIgnoreCase("INTENSITY_SERVICE")) {
                Log.d("client","INTENSITY SERVICE");
                mCep.cepAddDetails(getApplicationContext(), "define
stream lightIntensityStream (lightValue double); ",
"lightIntensityStream", query, callbackFunctionName, cb);
                taskManager = new
TaskManager(getApplicationContext(),"lightIntensityStream",mIEdgeAnalyticServiceCallback,mCep);
                taskManager.initializeServiceType(type);

            }
        }

        //get servicePatterns
        @Override
        public void getServicePattern(String type, String
streamDefinition, String stream, String query1,String query2 ,String
callbackFunctionName1,String
callbackFunctionName2,IEdgeAnalyticServiceCallback cb) {

            mIEdgeAnalyticServiceCallback = cb;

            /** Add the details to CEP if the Client is Type1 */
            if (type.equalsIgnoreCase("TYPE1")) {
                mCep.cepAddDetails(getApplicationContext(),
streamDefinition, stream, query1, query2,
callbackFunctionName1,callbackFunctionName2,cb);
            }

            else if (type.equalsIgnoreCase("LOCATION_SERVICE")) {
                mCep.cepAddDetails(getApplicationContext(), "define
stream LocationStream (latitude double,longitude double); ",
"LocationStream",
query1,query2,callbackFunctionName1,callbackFunctionName2,cb);
                taskManager = new
TaskManager(getApplicationContext(),"LocationStream",mIEdgeAnalyticServiceCallback,mCep);
                taskManager.initializeServiceType(type);
            }

            /** Add the details to CEP if the Client is
Type2-HUMIDITY_SERVICE */
            else if (type.equalsIgnoreCase("HUMIDITY_SERVICE")) {
                mCep.cepAddDetails(getApplicationContext(), "define
stream HumidityStream (humidity float); ",
"HumidityStream",query1,query2,callbackFunctionName1,callbackFunctionName2,cb);
                taskManager = new
TaskManager(getApplicationContext(),"HumidityStream",mIEdgeAnalyticServiceCallback,mCep);
                taskManager.initializeServiceType(type);
            }

            /** Add the details to CEP if the Client is
Type2-TEMPERATURE_SERVICE */
            else if (type.equalsIgnoreCase("TEMPERATURE_SERVICE")) {
                mCep.cepAddDetails(getApplicationContext(), "define
stream temperatureStream (tempValue float); ", "temperatureStream",
query1,query2,callbackFunctionName1,callbackFunctionName2,cb);
                taskManager = new
TaskManager(getApplicationContext(),"temperatureStream",mIEdgeAnalyticServiceCallback,mCep);
                taskManager.initializeServiceType(type);
            }

            /** Add the details to CEP if the Client is
Type2-INTENSITY_SERVICE*/
            else if (type.equalsIgnoreCase("INTENSITY_SERVICE")) {
                Log.d("client","INTENSITY SERVICE");
                mCep.cepAddDetails(getApplicationContext(), "define
stream lightIntensityStream (lightValue double); ",
"lightIntensityStream",
query1,query2,callbackFunctionName1,callbackFunctionName2,cb);
                taskManager = new
TaskManager(getApplicationContext(),"lightIntensityStream",mIEdgeAnalyticServiceCallback,mCep);
                taskManager.initializeServiceType(type);
            }
        }

        /** Passes values to the streams to do the analytics*/
        @Override
        public void sendData(String value, String stream){
            mCep.analyseTheData(value, stream,mIEdgeAnalyticServiceCallback);
        }

        @Override
        public void stopService()
        {
            taskManager.stopSensors();
        }
    };
}

I will try the Singleton pattern and I will update this mail thread.

Thank you.



On Tue, Nov 3, 2015 at 3:37 PM, Lasantha Fernando <[email protected]> wrote:

> Hi Lakini,
>
> It seems multiple Siddhi managers are getting instantiated and each siddhi
> manager is running without being shut down. Can you point to the code where
> the CEP class is instantiated?
>
> Also, you can try a singleton pattern for the CEP class and do some checks
> when adding the queries or adopt a similar approach to avoid the issue of
> multiple queries running.
>
> Thanks,
> Lasantha
>
> On 3 November 2015 at 15:31, Lakini Senanayaka <[email protected]> wrote:
>
>> adding Dev@wso2
>>
>> On Tue, Nov 3, 2015 at 3:25 PM, Lakini Senanayaka <[email protected]>
>> wrote:
>>
>>> Hi,
>>>
>>> I have implemented a service using Siddhi 2.1.0.
>>> This is my Service class which deals with siddhimanager.
>>>
>>> package org.wso2.edgeanalyticsservice1;
>>>
>>> import android.content.Context;
>>>
>>> import org.wso2.siddhi.core.SiddhiManager;
>>> import org.wso2.siddhi.core.event.Event;
>>> import org.wso2.siddhi.core.stream.input.InputHandler;
>>> import org.wso2.siddhi.core.stream.output.StreamCallback;
>>> import org.wso2.siddhi.core.util.EventPrinter;
>>>
>>> /**
>>>  * This class create from the build in sensors and inject
>>>  * to the EdgeAnalytics Service for processing on a query passed.
>>>  */
>>> public class CEP {
>>>
>>>     private SiddhiManager mSiddhiManager;
>>>     private IEdgeAnalyticServiceCallback mCallb = null;
>>>
>>>     /** Initialize the SiddhiManager Instance */
>>>     public CEP() {
>>>         mSiddhiManager = new SiddhiManager();
>>>     }
>>>
>>>     /** Add extra details to the SiddhiManager object  */
>>>     public void cepAddDetails(final Context context, String 
>>> streamDefinition, String stream, final String query, final String 
>>> callbackFunction,IEdgeAnalyticServiceCallback cb) {
>>>         mCallb=cb;
>>>
>>>         /** Define the stream to the Siddhi Manager  */
>>>         mSiddhiManager.defineStream(streamDefinition);
>>>
>>>         /** Define the query to the Siddhi Manager  */
>>>         mSiddhiManager.addQuery(query);
>>>
>>>         /** Define the scallback details to the Siddhi Manager  */
>>>         mSiddhiManager.addCallback(callbackFunction, new StreamCallback() {
>>>             public void receive(Event[] events) {
>>>                 EventPrinter.print(events);
>>>                 try {
>>>                     mCallb.addCallBack("Passed!! " );
>>>                 } catch (Exception e) {
>>>                     e.printStackTrace();
>>>                 }
>>>             }
>>>         });
>>>     }
>>>
>>>     /** Add extra details to the SiddhiManager object  */
>>>     public void cepAddDetails(final Context context, String 
>>> streamDefinition, String stream, final String query1, final String 
>>> query2,final String callbackFunction1,final String 
>>> callbackFunction2,IEdgeAnalyticServiceCallback cb) {
>>>         //todo:Check the pattern queries
>>>
>>>         mCallb=cb;
>>>
>>>         /** Define the stream to the Siddhi Manager  */
>>>         mSiddhiManager.defineStream(streamDefinition);
>>>
>>>         /** Define the query to the Siddhi Manager  */
>>>         mSiddhiManager.addQuery(query1);
>>>         mSiddhiManager.addQuery(query2);
>>>
>>>         /** Define the scallback details to the Siddhi Manager  */
>>>         mSiddhiManager.addCallback(callbackFunction1, new StreamCallback() {
>>>             public void receive(Event[] events) {
>>>                 EventPrinter.print(events);
>>>                 try {
>>>                     mCallb.addCallBack("callback1");
>>>                 } catch (Exception e) {
>>>                     e.printStackTrace();
>>>                 }
>>>             }
>>>         });
>>>
>>>         mSiddhiManager.addCallback(callbackFunction2, new StreamCallback() {
>>>             public void receive(Event[] events) {
>>>                 EventPrinter.print(events);
>>>                 try {
>>>                     mCallb.addCallBack("callback2");
>>>                 } catch (Exception e) {
>>>                     e.printStackTrace();
>>>                 }
>>>             }
>>>         });
>>>     }
>>>
>>>     /** Analyse single data and send the notification back to client 
>>> through callbacks */
>>>     public void analyseTheData(String value, String stream, 
>>> IEdgeAnalyticServiceCallback cb) {
>>>
>>>         String[] dataCollection=value.split(",");
>>>         String[] value_type=null;
>>>         Object[] x =new Object[dataCollection.length];
>>>
>>>         /** Identify the data types which the client sends. */
>>>         for(int i=0;i<dataCollection.length ;i++)
>>>         {
>>>             value_type=dataCollection[i].split("-");
>>>             x[i]=0;
>>>             switch (value_type[1]) {
>>>                 case "double":
>>>                     x[i]=Double.parseDouble(value_type[0]);
>>>                     break;
>>>                 case "float":
>>>                     x[i]=Float.parseFloat(value_type[0]);
>>>                     break;
>>>                 case "int":
>>>                     x[i]=Integer.parseInt(value_type[0]);
>>>                     break;
>>>                 case "string":
>>>                     x[i]=value_type[0];
>>>                     break;
>>>                 default:
>>>                     break;
>>>                }
>>>            }
>>>
>>>         InputHandler inputHandler = mSiddhiManager.getInputHandler(stream);
>>>            try
>>>            {
>>>                inputHandler.send(x);
>>>            }
>>>            catch (InterruptedException e) {
>>>                e.printStackTrace();
>>>            }
>>>     }
>>> }
>>>
>>> I have set a timer and I'm passing data to the siddhi stream.
>>> When I run this app for the first time it gives only one callback and
>>> working fine.But After I run the same app again and again it gives more
>>> callbacks and it cause for my output in the client side.
>>>
>>> This is my Log file:-
>>> 1-03 15:21:16.276  25481-25777/? I/System.out﹕
>>> [Event{streamId='LightValueLowHandleCallback2', timeStamp=1446544276262,
>>> data=[151.0], type=new}]
>>> 11-03 15:21:16.276  26231-26293/? D/callbak﹕ callback2
>>> 11-03 15:21:16.276  26231-26293/? D/client4﹕ gonna off the light
>>> 11-03 15:21:16.286  25481-25777/? I/System.out﹕
>>> [Event{streamId='LightValueLowHandleCallback2', timeStamp=1446544276262,
>>> data=[151.0], type=new}]
>>> 11-03 15:21:16.286  26231-26243/? D/callbak﹕ callback2
>>> 11-03 15:21:16.291  26231-26243/? D/client4﹕ gonna off the light
>>>
>>>
>>> So I want to stop repeating callbacks.Could you please show me the place
>>> of the code to correct ,not to get more callbacks.
>>>
>>> Thank you.
>>>
>>> --
>>> *Intern-Engineering*
>>> Lakini S.Senanayaka
>>> Mobile: +94 712295444
>>> Email: [email protected]
>>>
>>
>>
>>
>> --
>> *Intern-Engineering*
>> Lakini S.Senanayaka
>> Mobile: +94 712295444
>> Email: [email protected]
>>
>
>
>
> --
> *Lasantha Fernando*
> Senior Software Engineer - Data Technologies Team
> WSO2 Inc. http://wso2.com
>
> email: [email protected]
> mobile: (+94) 71 5247551
>



-- 
*Intern-Engineering*
Lakini S.Senanayaka
Mobile: +94 712295444
Email: [email protected]
_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to