http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java
new file mode 100755
index 0000000..900c214
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.DataSet;
+import org.apache.atlas.odf.api.metadata.models.UnknownDataSet;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore;
+import org.apache.atlas.odf.core.metadata.DefaultMetadataStore;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisCancelResult;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+
+public class ODFAPITest extends ODFTestBase {
+
+       public static int WAIT_MS_BETWEEN_POLLING = 500;
+       public static int MAX_NUMBER_OF_POLLS = 500;
+       public static String DUMMY_SUCCESS_ID = "success";
+       public static String DUMMY_ERROR_ID = "error";
+
+       public static void runRequestAndCheckResult(String dataSetID, 
AnalysisRequestStatus.State expectedState, int 
expectedProcessedDiscoveryRequests) throws Exception{
+               runRequestAndCheckResult(Collections.singletonList(dataSetID), 
expectedState, expectedProcessedDiscoveryRequests);
+       }
+       
+       public static void runRequestAndCheckResult(List<String> dataSetIDs, 
AnalysisRequestStatus.State expectedState, int 
expectedProcessedDiscoveryRequests) throws Exception{
+               AnalysisManager analysisManager = new 
ODFFactory().create().getAnalysisManager();
+               String id = runRequest(dataSetIDs, analysisManager);
+               log.info("Running request "+id+" on data sets: " + dataSetIDs);
+               AnalysisRequestStatus status = null;
+
+               int maxPolls = MAX_NUMBER_OF_POLLS;
+               do {
+                       status = analysisManager.getAnalysisRequestStatus(id);
+                       log.log(Level.INFO, "{4}th poll request for request ID 
''{0}'' (expected state: ''{3}''): state: ''{1}'', details: ''{2}''", new 
Object[] { id, status.getState(), status.getDetails(),
+                                       expectedState, 
(MAX_NUMBER_OF_POLLS-maxPolls) });
+                       maxPolls--;
+                       Thread.sleep(WAIT_MS_BETWEEN_POLLING);
+               } while (maxPolls > 0 && (status.getState() == 
AnalysisRequestStatus.State.ACTIVE || status.getState() == 
AnalysisRequestStatus.State.QUEUED || status.getState() == 
AnalysisRequestStatus.State.NOT_FOUND));
+
+               log.log(Level.INFO, "Polling result after {0} polls for request 
id {1}: status: {2}", new Object[] {(MAX_NUMBER_OF_POLLS-maxPolls), id, 
status.getState()});
+               
+               Assert.assertTrue(maxPolls > 0);                
+               Assert.assertEquals(expectedState, status.getState());
+               AnalysisRequestTrackerStore store = new 
ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
+               AnalysisRequestTracker tracker = store.query(id);
+               Assert.assertNotNull(tracker);
+               checkTracker(tracker, expectedProcessedDiscoveryRequests);
+               log.info("Status details: " + status.getDetails());
+       }
+
+       static void checkTracker(AnalysisRequestTracker tracker, int 
expectedProcessedDiscoveryRequests) {
+               if (expectedProcessedDiscoveryRequests == -1) {
+                       expectedProcessedDiscoveryRequests = 
tracker.getDiscoveryServiceRequests().size(); 
+               }
+               Assert.assertEquals(expectedProcessedDiscoveryRequests, 
tracker.getDiscoveryServiceResponses().size());
+               
+       }
+
+       static String runRequest(String dataSetID, AnalysisManager 
analysisManager) throws Exception {
+               return runRequest(Collections.singletonList(dataSetID), 
analysisManager);
+       }
+
+       public static String runRequest(List<String> dataSetIDs, 
AnalysisManager analysisManager) throws Exception {
+               AnalysisRequest request = createAnalysisRequest(dataSetIDs);
+               log.info("Starting analyis");
+               AnalysisResponse response = 
analysisManager.runAnalysis(request);
+               Assert.assertNotNull(response);
+               Assert.assertFalse(response.isInvalidRequest());
+               String id = response.getId();
+               Assert.assertNotNull(id);
+               return id;
+       }
+
+       
+       @Test
+       public void testSimpleSuccess() throws Exception {
+               runRequestAndCheckResult("successID", 
AnalysisRequestStatus.State.FINISHED, -1);
+       }
+
+       public static void waitForRequest(String requestId, AnalysisManager 
analysisManager) {
+               waitForRequest(requestId, analysisManager, MAX_NUMBER_OF_POLLS);
+       }
+       
+       public static void waitForRequest(String requestId, AnalysisManager 
analysisManager, int maxPolls) {
+               AnalysisRequestStatus status = null;
+
+               log.log(Level.INFO, "Waiting for request ''{0}'' to finish", 
requestId);
+               do {
+                       status = 
analysisManager.getAnalysisRequestStatus(requestId);
+                       
+                       log.log(Level.INFO, "Poll request for request ID 
''{0}'', state: ''{1}'', details: ''{2}''", new Object[] { requestId, 
status.getState(), status.getDetails() });
+                       maxPolls--;
+                       try {
+                               Thread.sleep(WAIT_MS_BETWEEN_POLLING);
+                       } catch (InterruptedException e) {
+                               e.printStackTrace();
+                               throw new RuntimeException(e);
+                       }
+               } while (maxPolls > 0 && (status.getState() == 
AnalysisRequestStatus.State.ACTIVE || status.getState() == 
AnalysisRequestStatus.State.QUEUED || status.getState() == 
AnalysisRequestStatus.State.NOT_FOUND));
+               if (maxPolls == 0) {
+                       log.log(Level.INFO, "Request ''{0}'' is not finished 
yet, don't wait for it", requestId);
+               }
+               log.log(Level.INFO, "Request ''{0}'' is finished with state: 
''{1}''", new Object[] { requestId, status.getState() });
+       }
+
+       public static boolean waitForRequest(String requestId, AnalysisManager 
analysisManager, int maxPolls, AnalysisRequestStatus.State expectedState) {
+               AnalysisRequestStatus status = null;
+
+               log.log(Level.INFO, "Waiting for request ''{0}'' to finish", 
requestId);
+               do {
+                       status = 
analysisManager.getAnalysisRequestStatus(requestId);
+                       log.log(Level.INFO, "Poll request for request ID 
''{0}'', state: ''{1}'', details: ''{2}''", new Object[] { requestId, 
status.getState(), status.getDetails() });
+                       maxPolls--;
+                       try {
+                               Thread.sleep(WAIT_MS_BETWEEN_POLLING);
+                       } catch (InterruptedException e) {
+                               e.printStackTrace();
+                               throw new RuntimeException(e);
+                       }
+               } while (maxPolls > 0 && (status.getState() == 
AnalysisRequestStatus.State.ACTIVE || status.getState() == 
AnalysisRequestStatus.State.QUEUED || status.getState() == 
AnalysisRequestStatus.State.NOT_FOUND));
+               if (maxPolls == 0) {
+                       log.log(Level.INFO, "Request ''{0}'' is not finished 
yet, don't wait for it", requestId);
+               }
+               return expectedState.equals(status.getState());
+       }
+
+       
+       @Test
+       public void testSimpleSuccessDuplicate() throws Exception {
+               AnalysisManager analysisManager = new 
ODFFactory().create().getAnalysisManager();
+               String id = runRequest("successID", analysisManager);
+               String secondId = runRequest("successID", analysisManager);
+               Assert.assertNotEquals(id, secondId);
+               //Wait limit and try if new analysis is started
+               
Thread.sleep(DefaultStatusQueueStore.IGNORE_SIMILAR_REQUESTS_TIMESPAN_MS*2 + 
5000);
+               String thirdId = runRequest("successID", analysisManager);
+               Assert.assertNotEquals(secondId, thirdId);
+               waitForRequest(id, analysisManager);
+               waitForRequest(thirdId, analysisManager);
+       }
+
+       @Test
+       public void testSimpleSuccessNoDuplicate() throws Exception {
+               AnalysisManager analysisManager = new 
ODFFactory().create().getAnalysisManager();
+               String id = runRequest("successID", analysisManager);
+               String secondId = runRequest("successID2", analysisManager);
+               Assert.assertNotEquals(id, secondId);
+               waitForRequest(id, analysisManager);
+               waitForRequest(secondId, analysisManager);
+       }
+
+       @Test
+       public void testSimpleSuccessDuplicateSubset() throws Exception {
+               AnalysisManager analysisManager = new 
ODFFactory().create().getAnalysisManager();
+               String id = runRequest(Arrays.asList("successID", "successID2", 
"successID3"), analysisManager);
+               String secondId = runRequest("successID2", analysisManager);
+               Assert.assertNotEquals(id, secondId);
+               
Thread.sleep(DefaultStatusQueueStore.IGNORE_SIMILAR_REQUESTS_TIMESPAN_MS + 
5000);
+               String thirdId = runRequest("successID", analysisManager);
+               Assert.assertNotEquals(secondId, thirdId);
+               waitForRequest(id, analysisManager);
+               waitForRequest(thirdId, analysisManager);
+       }
+       
+       /**
+        * This test depends on the speed of execution.
+        * An analysis that is not in state INITIALIZED or IN_SERVICE_QUEUE 
cannot be cancelled. 
+        * Therefore if the analysis is started too quickly this test will fail!
+        * 
+        * Ignore for now as this can go wrong in the build.
+        */
+       @Test
+       @Ignore
+       public void testCancelRequest() throws Exception {
+               AnalysisManager analysisManager = new 
ODFFactory().create().getAnalysisManager();
+               String id = runRequest(Arrays.asList("successID", "successID2", 
"successID3"), analysisManager);
+               AnalysisCancelResult cancelAnalysisRequest = 
analysisManager.cancelAnalysisRequest(id);
+               Assert.assertEquals(cancelAnalysisRequest.getState(), 
AnalysisCancelResult.State.SUCCESS);
+               String secondId = runRequest("successID2", analysisManager);
+               Assert.assertNotEquals(id, secondId);
+       }
+
+       
+       @Test
+       public void testRequestsWithDataSetListSuccess() throws Exception {
+               runRequestAndCheckResult(Arrays.asList("success1", "success2", 
"success3"), AnalysisRequestStatus.State.FINISHED, 6);
+       }
+       
+       @Test
+       public void testRequestsWithDataSetListError() throws Exception {
+               runRequestAndCheckResult(Arrays.asList("success1", "error2", 
"success3"), AnalysisRequestStatus.State.ERROR, 3);
+       }
+
+               
+
+       @Test
+       public void testSimpleFailure() throws Exception {
+               runRequestAndCheckResult("errorID", 
AnalysisRequestStatus.State.ERROR, 1);
+       }
+       
+       @Test 
+       public void testManyRequests()  throws Exception {
+               List<String> dataSets = new ArrayList<String>();
+               List<AnalysisRequestStatus.State> expectedStates = new 
ArrayList<AnalysisRequestStatus.State>();
+               int dataSetNum = 5;
+               for (int i=0; i<dataSetNum; i++) {
+                       AnalysisRequestStatus.State expectedState = 
AnalysisRequestStatus.State.FINISHED;
+                       String dataSet = "successdataSet" + i;
+                       if (i % 3 == 0) {
+                               // every third data set should fail
+                               dataSet = "errorDataSet" + i;
+                               expectedState = 
AnalysisRequestStatus.State.ERROR;
+                       } 
+                       dataSets.add(dataSet);
+                       expectedStates.add(expectedState);
+               }
+               
+               runRequests(dataSets, expectedStates);
+       }
+
+       public void runRequests(List<String> dataSetIDs, 
List<AnalysisRequestStatus.State> expectedStates) throws Exception {
+               Assert.assertTrue(dataSetIDs.size() == expectedStates.size());
+               AnalysisManager analysisManager = new 
ODFFactory().create().getAnalysisManager();
+
+               Map<AnalysisRequest, AnalysisRequestStatus.State> 
request2ExpectedState = new HashMap<AnalysisRequest, 
AnalysisRequestStatus.State>();
+
+               for (int i = 0; i < dataSetIDs.size(); i++) {
+                       String dataSetID = dataSetIDs.get(i);
+                       AnalysisRequestStatus.State expectedState = 
expectedStates.get(i);
+
+                       AnalysisRequest request = 
createAnalysisRequest(Collections.singletonList(dataSetID));
+
+                       log.info("Starting analyis");
+                       AnalysisResponse response = 
analysisManager.runAnalysis(request);
+                       Assert.assertNotNull(response);
+                       String id = response.getId();
+                       Assert.assertFalse(response.isInvalidRequest());
+                       Assert.assertNotNull(id);
+                       request.setId(id);
+                       request2ExpectedState.put(request, expectedState);
+               }
+
+               //              Set<AnalysisRequest> finishedRequests = new 
HashSet<AnalysisRequest>();
+               Map<AnalysisRequest, AnalysisRequestStatus> 
actualFinalStatePerRequest = new HashMap<AnalysisRequest, 
AnalysisRequestStatus>();
+               int maxPollPasses = 10;
+               for (int i = 0; i < maxPollPasses; i++) {
+                       log.info("Polling all requests for the " + i + " th 
time");
+                       boolean allRequestsFinished = true;
+                       for (Map.Entry<AnalysisRequest, 
AnalysisRequestStatus.State> entry : request2ExpectedState.entrySet()) {
+
+                               AnalysisRequest request = entry.getKey();
+                               String id = request.getId();
+                               if 
(actualFinalStatePerRequest.containsKey(request)) {
+                                       log.log(Level.INFO, "Request with ID 
''{0}'' already finished, skipping it", id);
+                               } else {
+                                       allRequestsFinished = false;
+
+                                       AnalysisRequestStatus.State 
expectedState = entry.getValue();
+
+                                       AnalysisRequestStatus status = null;
+
+                                       int maxPollsPerRequest = 3;
+                                       do {
+                                               status = 
analysisManager.getAnalysisRequestStatus(id);
+                                               log.log(Level.INFO, "Poll 
request for request ID ''{0}'' (expected state: ''{3}''): state: ''{1}'', 
details: ''{2}''",
+                                                               new Object[] { 
id, status.getState(), status.getDetails(), expectedState });
+                                               maxPollsPerRequest--;
+                                               Thread.sleep(1000);
+                                       } while (maxPollsPerRequest > 0 && 
(status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() 
== AnalysisRequestStatus.State.QUEUED || status.getState() == 
AnalysisRequestStatus.State.NOT_FOUND));
+
+                                       if (maxPollsPerRequest > 0) {
+                                               // final state found
+                                               
actualFinalStatePerRequest.put(request, status);
+                                               //                              
Assert.assertEquals(expectedState, status.getState());
+                                       }
+                               }
+                       }
+                       if (allRequestsFinished) {
+                               log.info("All requests finished");
+                               break;
+                       }
+               }
+               Assert.assertTrue(actualFinalStatePerRequest.size() == 
request2ExpectedState.size());
+               
Assert.assertTrue(actualFinalStatePerRequest.keySet().equals(request2ExpectedState.keySet()));
+               for (Map.Entry<AnalysisRequest, AnalysisRequestStatus> actual : 
actualFinalStatePerRequest.entrySet()) {
+                       AnalysisRequest req = actual.getKey();
+                       Assert.assertNotNull(req);
+                       AnalysisRequestStatus.State expectedState = 
request2ExpectedState.get(req);
+                       Assert.assertNotNull(expectedState);
+                       AnalysisRequestStatus.State actualState = 
actual.getValue().getState();
+                       Assert.assertNotNull(actualState);
+
+                       log.log(Level.INFO, "Checking request ID ''{0}'', 
actual state: ''{1}'', expected state: ''{2}''", new Object[] { req.getId(), 
actualState, expectedState });
+                       Assert.assertNotNull(expectedState);
+                       Assert.assertEquals(expectedState, actualState);
+               }
+       }
+
+       public static AnalysisRequest createAnalysisRequest(List<String> 
dataSetIDs) throws JSONException {
+               AnalysisRequest request = new AnalysisRequest();
+               List<MetaDataObjectReference> dataSetRefs = new ArrayList<>();
+               MetadataStore mds = new 
ODFFactory().create().getMetadataStore();
+               if (!(mds instanceof DefaultMetadataStore)) {
+                       throw new RuntimeException(MessageFormat.format("This 
tests does not work with metadata store implementation \"{0}\" but only with 
the DefaultMetadataStore.", mds.getClass().getName()));
+               }
+               DefaultMetadataStore defaultMds = (DefaultMetadataStore) mds;
+               defaultMds.resetAllData();
+               for (String id : dataSetIDs) {
+                       MetaDataObjectReference mdr = new 
MetaDataObjectReference();
+                       mdr.setId(id);
+                       dataSetRefs.add(mdr);
+                       if (id.startsWith(DUMMY_SUCCESS_ID) || 
id.startsWith(DUMMY_ERROR_ID)) {
+                               log.info("Creating dummy data set for reference 
: " + id.toString());
+                               DataSet ds = new UnknownDataSet();
+                               ds.setReference(mdr);
+                               defaultMds.createObject(ds);
+                       }
+               }
+               defaultMds.commit();
+               request.setDataSets(dataSetRefs);
+               List<String> serviceIds = Arrays.asList(new 
String[]{"asynctestservice", "synctestservice"});
+               /* use a fix list of services 
+               List<DiscoveryServiceRegistrationInfo> registeredServices = new 
ODFFactory().create(ControlCenter.class).getConfig().getRegisteredServices();   
        
+               for(DiscoveryServiceRegistrationInfo service : 
registeredServices){
+                       serviceIds.add(service.getId());
+               }
+               */
+               request.setDiscoveryServiceSequence(serviceIds);
+               Map<String, Object> additionalProps = new HashMap<String, 
Object>();
+               additionalProps.put("aaa", "bbb");
+               JSONObject jo = new JSONObject();
+               jo.put("p1", "v1");
+               jo.put("p2", "v2");
+               additionalProps.put("jo", jo);
+               request.setAdditionalProperties(additionalProps);
+               return request;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java
new file mode 100755
index 0000000..9aa3ba4
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.engine.EngineManager;
+import org.apache.atlas.odf.api.engine.SystemHealth;
+import org.apache.atlas.odf.api.engine.SystemHealth.HealthStatus;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.analysis.AnalysisManagerImpl;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+
+public class ParallelODFTest extends ODFTestcase {
+       Logger log = ODFTestLogger.get();
+       
+       @Test
+       public void runDataSetsInParallelSuccess() throws Exception {
+               runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] 
{ "successID1", "successID2" }), State.FINISHED);
+       }
+
+       @Test 
+       public void runDataSetsInParallelError() throws Exception {
+               runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] 
{ "successID1", "errorID2" }), State.ERROR);
+       }
+
+       private void runDataSetsInParallelAndCheckResult(List<String> 
dataSetIDs, State expectedState) throws Exception {
+               log.info("Running data sets in parallel: " + dataSetIDs);
+               log.info("Expected state: " + expectedState);
+               AnalysisRequest req = 
ODFAPITest.createAnalysisRequest(dataSetIDs);
+               // Enable parallel processing because this is a parallel test
+               req.setProcessDataSetsSequentially(false);
+               AnalysisManager analysisManager = new 
ODFFactory().create().getAnalysisManager();
+               EngineManager engineManager = new 
ODFFactory().create().getEngineManager();
+
+               SystemHealth healthCheckResult = 
engineManager.checkHealthStatus();
+               Assert.assertEquals(HealthStatus.OK, 
healthCheckResult.getStatus());
+               AnalysisResponse resp = analysisManager.runAnalysis(req);
+               log.info("Parallel requests started");
+
+               String id = resp.getId();
+               List<String> singleIds = Utils.splitString(id, 
AnalysisManagerImpl.COMPOUND_REQUEST_SEPARATOR);
+               List<String> singleDetails = 
Utils.splitString(resp.getDetails(), 
AnalysisManagerImpl.COMPOUND_REQUEST_SEPARATOR);
+               Assert.assertEquals(dataSetIDs.size(), singleIds.size());
+               Assert.assertEquals(dataSetIDs.size(), singleDetails.size());
+
+               AnalysisRequestStatus status = null;
+
+               // check that requests are processed in parallel: 
+               //   there must be a point in time where both requests are in 
status "active"
+               log.info("Polling for status of parallel request...");
+               boolean foundPointInTimeWhereBothRequestsAreActive = false;
+               int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS;
+               do {
+                       List<State> allSingleStates = new 
ArrayList<AnalysisRequestStatus.State>();
+                       for (String singleId : singleIds) {
+                               
allSingleStates.add(analysisManager.getAnalysisRequestStatus(singleId).getState());
+                       }
+                       if (Utils.containsOnly(allSingleStates, new State[] { 
State.ACTIVE })) {
+                               foundPointInTimeWhereBothRequestsAreActive = 
true;
+                       }
+
+                       status = analysisManager.getAnalysisRequestStatus(id);
+                       log.log(Level.INFO, "Poll request for parallel request 
ID ''{0}'' (expected state: ''{3}''): state: ''{1}'', details: ''{2}''", new 
Object[] { id, status.getState(), status.getDetails(),
+                                       expectedState });
+                       log.info("States of single requests: " + singleIds + ": 
" + allSingleStates);
+                       maxPolls--;
+                       Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING);
+               } while (maxPolls > 0 && (status.getState() == State.ACTIVE || 
status.getState() == State.QUEUED));
+
+               Assert.assertTrue(maxPolls > 0);
+               Assert.assertEquals(expectedState, status.getState());
+               Assert.assertTrue(foundPointInTimeWhereBothRequestsAreActive);
+               log.info("Parallel request status details: " + 
status.getDetails());
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java
new file mode 100755
index 0000000..9a43b78
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import java.util.logging.Level;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+
+public class SetTrackerStatusTest extends ODFTestBase {
+
+       @Test
+       public void testSetTrackerStatus() throws Exception {
+               AnalysisManager am = new 
ODFFactory().create().getAnalysisManager();
+               AnalysisRequestTrackerStore arts = new 
ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
+               String requestId = ODFAPITest.runRequest("successId", am);
+               Thread.sleep(1000);
+               long cutOffTimestamp = System.currentTimeMillis();              
+               String testMessage = "Message was set to error at " + 
cutOffTimestamp;
+               arts.setStatusOfOldRequest(cutOffTimestamp, STATUS.ERROR, 
testMessage);
+               AnalysisRequestTracker tracker = arts.query(requestId);
+               Assert.assertEquals(STATUS.ERROR, tracker.getStatus());
+               Assert.assertEquals(testMessage, tracker.getStatusDetails());
+               
+               // wait until request is finished and state is set back to 
finished
+               log.log(Level.INFO, "Waiting for request ''{0}'' to finish", 
requestId);
+               int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS;
+               AnalysisRequestStatus status = null;
+               do {
+                       status = am.getAnalysisRequestStatus(requestId);
+                       log.log(Level.INFO, "Poll request for request ID 
''{0}'', state: ''{1}'', details: ''{2}''", new Object[] { requestId, 
status.getState(), status.getDetails() });
+                       maxPolls--;
+                       try {
+                               
Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING);
+                       } catch (InterruptedException e) {
+                               // TODO Auto-generated catch block
+                               e.printStackTrace();
+                       }
+               } while (maxPolls > 0 && (status.getState() != 
AnalysisRequestStatus.State.FINISHED) );
+               
+               Assert.assertEquals(AnalysisRequestStatus.State.FINISHED, 
am.getAnalysisRequestStatus(requestId).getState());
+               tracker = arts.query(requestId);
+               Assert.assertEquals(STATUS.FINISHED, tracker.getStatus());
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java
new file mode 100755
index 0000000..0f1aa8f
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.discoveryservice;
+
+import java.io.InputStream;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceJavaEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import 
org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRuntimeStatistics;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceStatus;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+
+public class DiscoveryServiceManagerTest {
+       
+       final private static String ASYNCTESTWA_SERVICE_ID = 
"asynctestservice-with-annotations";
+
+       final private static String NEW_SERVICE_ID = "New_Service";
+       final private static String NEW_SERVICE_NAME = "Name of New Service";
+       final private static String NEW_SERVICE_DESCRIPTION = "Description of 
the New Service";
+       final private static String NEW_SERVICE_CLASSNAME = 
"TestAsyncDiscoveryService1";
+       
+       final private static String UPDATED_SERVICE_DESCRIPTION = "Updated 
description of the New Service";
+       final private static String UPDATED_SERVICE_CLASSNAME = 
"TestSyncDiscoveryService1";
+       
+       private void registerDiscoveryService(DiscoveryServiceProperties 
dsProperties) throws ValidationException {
+               DiscoveryServiceManager discoveryServicesManager = new 
ODFFactory().create().getDiscoveryServiceManager();
+               discoveryServicesManager.createDiscoveryService(dsProperties);
+       }
+       
+       private void replaceDiscoveryService(DiscoveryServiceProperties 
dsProperties) throws ValidationException {
+               DiscoveryServiceManager discoveryServicesManager = new 
ODFFactory().create().getDiscoveryServiceManager();
+               discoveryServicesManager.replaceDiscoveryService(dsProperties);
+       }
+       
+       private void unregisterDiscoveryService(String serviceId) throws 
ServiceNotFoundException, ValidationException {
+               DiscoveryServiceManager discoveryServicesManager = new 
ODFFactory().create().getDiscoveryServiceManager();
+               discoveryServicesManager.deleteDiscoveryService(serviceId);
+       }
+               
+       @Test
+       public void testGetDiscoveryServiceProperties() throws 
ServiceNotFoundException {
+               DiscoveryServiceManager discoveryServicesManager = new 
ODFFactory().create().getDiscoveryServiceManager();
+               DiscoveryServiceProperties dsProperties = 
discoveryServicesManager.getDiscoveryServiceProperties(ASYNCTESTWA_SERVICE_ID);
+               Assert.assertNotNull(dsProperties);
+       }
+       
+               
+       @Ignore @Test    // Ignoring testcase due to problem on Mac (issue #56)
+       public void testGetDiscoveryServiceStatus() throws 
ServiceNotFoundException {
+               DiscoveryServiceManager discoveryServicesManager = new 
ODFFactory().create().getDiscoveryServiceManager();
+               DiscoveryServiceStatus dsStatus = 
discoveryServicesManager.getDiscoveryServiceStatus(ASYNCTESTWA_SERVICE_ID);
+               Assert.assertNotNull(dsStatus);
+       }
+       
+       @Test  // TODO: need to adjust as soon as runtime statistics are 
available
+       public void testGetDiscoveryServiceRuntimeStatistics() throws 
ServiceNotFoundException {
+               DiscoveryServiceManager discoveryServicesManager = new 
ODFFactory().create().getDiscoveryServiceManager();
+               DiscoveryServiceRuntimeStatistics dsRuntimeStats = 
discoveryServicesManager.getDiscoveryServiceRuntimeStatistics(ASYNCTESTWA_SERVICE_ID);
+               Assert.assertNotNull(dsRuntimeStats);
+               long avgProcTime = 
dsRuntimeStats.getAverageProcessingTimePerItemInMillis();
+               Assert.assertEquals(0, avgProcTime);
+       }
+
+       @Test
+       public void testDeleteDiscoveryServiceRuntimeStatistics() throws 
ServiceNotFoundException {
+               DiscoveryServiceManager discoveryServicesManager = new 
ODFFactory().create().getDiscoveryServiceManager();
+               
discoveryServicesManager.deleteDiscoveryServiceRuntimeStatistics(ASYNCTESTWA_SERVICE_ID);
+       }
+
+       @Test
+       public void testGetDiscoveryServiceImage() throws 
ServiceNotFoundException {
+               DiscoveryServiceManager discoveryServicesManager = new 
ODFFactory().create().getDiscoveryServiceManager();
+               InputStream is = 
discoveryServicesManager.getDiscoveryServiceImage(ASYNCTESTWA_SERVICE_ID);
+               Assert.assertNull(is);
+       }
+
+       @Test
+       public void testCreateUpdateDelete() throws ServiceNotFoundException, 
ValidationException, JSONException {
+               DiscoveryServiceJavaEndpoint dse = new 
DiscoveryServiceJavaEndpoint();
+               dse.setClassName(NEW_SERVICE_CLASSNAME);
+               DiscoveryServiceProperties dsProperties = new 
DiscoveryServiceProperties();
+               dsProperties.setId(NEW_SERVICE_ID);
+               dsProperties.setName(NEW_SERVICE_NAME);
+               dsProperties.setDescription(NEW_SERVICE_DESCRIPTION);
+               dsProperties.setLink(null);
+               dsProperties.setPrerequisiteAnnotationTypes(null);
+               dsProperties.setResultingAnnotationTypes(null);
+               dsProperties.setSupportedObjectTypes(null);
+               dsProperties.setAssignedObjectTypes(null);
+               dsProperties.setAssignedObjectCandidates(null);
+               dsProperties.setEndpoint(JSONUtils.convert(dse, 
DiscoveryServiceEndpoint.class));
+               dsProperties.setParallelismCount(2);
+               registerDiscoveryService(dsProperties);
+
+               DiscoveryServiceJavaEndpoint dse2 = new 
DiscoveryServiceJavaEndpoint();
+               dse2.setClassName(UPDATED_SERVICE_CLASSNAME);
+               DiscoveryServiceProperties dsProperties2 = new 
DiscoveryServiceProperties();
+               dsProperties2.setId(NEW_SERVICE_ID);
+               dsProperties2.setName(NEW_SERVICE_NAME);
+               dsProperties2.setDescription(UPDATED_SERVICE_DESCRIPTION);
+               dsProperties2.setLink(null);
+               dsProperties.setPrerequisiteAnnotationTypes(null);
+               dsProperties.setResultingAnnotationTypes(null);
+               dsProperties.setSupportedObjectTypes(null);
+               dsProperties.setAssignedObjectTypes(null);
+               dsProperties.setAssignedObjectCandidates(null);
+               dsProperties2.setEndpoint(JSONUtils.convert(dse2, 
DiscoveryServiceEndpoint.class));
+               dsProperties2.setParallelismCount(2);
+               replaceDiscoveryService(dsProperties2);
+
+               unregisterDiscoveryService(NEW_SERVICE_ID);
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java
new file mode 100755
index 0000000..2ea85b7
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.discoveryservice;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import 
org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncRunStatus;
+import org.junit.Assert;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import 
org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class TestAsyncDiscoveryService1 extends DiscoveryServiceBase 
implements AsyncDiscoveryService {
+
+       static int unavailableCounter = 0;
+
+       static Logger logger = ODFTestLogger.get();
+
+       public static void 
checkUserAndAdditionalProperties(DiscoveryServiceRequest request) {
+               String user = request.getUser();
+               
+               String defaultUser = System.getProperty("user.name");
+               Assert.assertEquals(defaultUser, user);
+
+               Map<String, Object> additionalProperties = 
request.getAdditionalProperties();
+               logger.info("TestAsyncDiscoveryService1.startAnalysis 
additional properties: " + additionalProperties);
+               Assert.assertNotNull(additionalProperties);
+               
+               // check that environment entries are also available additional 
properties
+               Environment ev = new 
ODFInternalFactory().create(Environment.class);
+               String dsId = request.getDiscoveryServiceId();
+               Map<String, String> serviceEnvProps = 
ev.getPropertiesWithPrefix(dsId);
+               if (!serviceEnvProps.isEmpty()) {
+                       Assert.assertTrue(!additionalProperties.isEmpty());
+                       for (Map.Entry<String, String> serviceEnvProp : 
serviceEnvProps.entrySet()) {
+                               String key = serviceEnvProp.getKey();
+                               String val = serviceEnvProp.getValue();
+                               logger.info("Found discoveryservice 
configuration parameter: " + key + " with value " + val);
+                               Assert.assertTrue(key.startsWith(dsId));
+                               
Assert.assertTrue(additionalProperties.containsKey(key) );
+                               Assert.assertEquals(val, 
additionalProperties.get(key));
+                       }
+               }
+               
+               if (!additionalProperties.isEmpty()) {
+                       
Assert.assertTrue(additionalProperties.containsKey("aaa"));
+                       
Assert.assertTrue("bbb".equals(additionalProperties.get("aaa")));
+                       
Assert.assertTrue(additionalProperties.containsKey("jo"));
+                       @SuppressWarnings("unchecked")
+                       Map<String, Object> m = (Map<String, Object>) 
additionalProperties.get("jo");
+                       Assert.assertTrue("v1".equals(m.get("p1")));
+                       Assert.assertTrue("v2".equals(m.get("p2")));
+                       /*
+                       if (!additionalProperties.containsKey("aaa")) {
+                               response.setCode(ResponseCode.UNKNOWN_ERROR);
+                               response.setDetails("Additional property value 
'aaa' doesn't exist");
+                               return;
+                       }
+                       if (!"bbb".equals(additionalProperties.get("aaa"))) {
+                               response.setCode(ResponseCode.UNKNOWN_ERROR);
+                               response.setDetails("Additional properties 
'aaa' has wrong value");
+                               return;
+                       }
+                       if (!additionalProperties.containsKey("jo")) {
+                               response.setCode(ResponseCode.UNKNOWN_ERROR);
+                               response.setDetails("Additional property value 
'jo' doesn't exist");
+                               return;
+                       }
+                       Map m = (Map) additionalProperties.get("jo");
+                       if (!"v1".equals(m.get("p1"))) {
+                               response.setCode(ResponseCode.UNKNOWN_ERROR);
+                               response.setDetails("Additional property value 
'jo.p1' doesn't exist");
+                               return;
+
+                       }
+                       if (!"v2".equals(m.get("p2"))) {
+                               response.setCode(ResponseCode.UNKNOWN_ERROR);
+                               response.setDetails("Additional property value 
'jo.p2' doesn't exist");
+                               return;
+                       }
+                       */
+               }
+       }
+       
+       @Override
+       public DiscoveryServiceAsyncStartResponse 
startAnalysis(DiscoveryServiceRequest request) {
+               try {
+                       DiscoveryServiceResponse.ResponseCode code = 
DiscoveryServiceResponse.ResponseCode.TEMPORARILY_UNAVAILABLE;
+                       String details = "Cannot answer right now";
+                       if (unavailableCounter % 2 == 0) {
+                               code = DiscoveryServiceResponse.ResponseCode.OK;
+                               details = "Everything's peachy";
+                       }
+                       unavailableCounter++;
+                       /*
+                       if (unavailableCounter % 3 == 0) {
+                               code = CODE.NOT_AUTHORIZED;
+                               details = "You have no power here!";
+                       }
+                       */
+                       DiscoveryServiceAsyncStartResponse response = new 
DiscoveryServiceAsyncStartResponse();
+                       response.setCode(code);
+                       response.setDetails(details);
+                       if (code == DiscoveryServiceResponse.ResponseCode.OK) {
+                               String runid = "TestAsyncService1" + 
UUID.randomUUID().toString();
+                               synchronized (lock) {
+                                       runIDsRunning.put(runid, 4); // return 
status "running" 4 times before finishing
+                               }
+                               response.setRunId(runid);
+                               String dataSetId = 
request.getDataSetContainer().getDataSet().getReference().getId();
+                               if (dataSetId.startsWith("error")) {
+                                       logger.info("TestAsync Discovery 
Service run " + runid + " will fail");
+                                       runIDsWithError.add(runid);
+                               } else {
+                                       logger.info("TestAsync Discovery 
Service run " + runid + " will succeed");
+                               }
+                       }
+                       logger.info("TestAsyncDiscoveryService1.startAnalysis 
returns: " + JSONUtils.lazyJSONSerializer(response));
+                       checkUserAndAdditionalProperties(request);
+                       /*
+                       String user = request.getUser();
+                       Assert.assertEquals(TestControlCenter.TEST_USER_ID, 
user);
+
+                       Map<String, Object> additionalProperties = 
request.getAdditionalProperties();
+                       logger.info("TestAsyncDiscoveryService1.startAnalysis 
additional properties: " + additionalProperties);
+                       Assert.assertNotNull(additionalProperties);
+                       if (!additionalProperties.isEmpty()) {
+                               if (!additionalProperties.containsKey("aaa")) {
+                                       
response.setCode(ResponseCode.UNKNOWN_ERROR);
+                                       response.setDetails("Additional 
property value 'aaa' doesn't exist");
+                                       return response;
+                               }
+                               if 
(!"bbb".equals(additionalProperties.get("aaa"))) {
+                                       
response.setCode(ResponseCode.UNKNOWN_ERROR);
+                                       response.setDetails("Additional 
properties 'aaa' has wrong value");
+                                       return response;
+                               }
+                               if (!additionalProperties.containsKey("jo")) {
+                                       
response.setCode(ResponseCode.UNKNOWN_ERROR);
+                                       response.setDetails("Additional 
property value 'jo' doesn't exist");
+                                       return response;
+                               }
+                               Map m = (Map) additionalProperties.get("jo");
+                               if (!"v1".equals(m.get("p1"))) {
+                                       
response.setCode(ResponseCode.UNKNOWN_ERROR);
+                                       response.setDetails("Additional 
property value 'jo.p1' doesn't exist");
+                                       return response;
+
+                               }
+                               if (!"v2".equals(m.get("p2"))) {
+                                       
response.setCode(ResponseCode.UNKNOWN_ERROR);
+                                       response.setDetails("Additional 
property value 'jo.p2' doesn't exist");
+                                       return response;
+                               }
+                       }
+                       */
+                       return response;
+               } catch (Throwable t) {
+                       DiscoveryServiceAsyncStartResponse response = new 
DiscoveryServiceAsyncStartResponse();
+                       
response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+                       response.setDetails(Utils.getExceptionAsString(t));
+                       return response;
+               }
+       }
+
+       static Object lock = new Object();
+       static Map<String, Integer> runIDsRunning = new HashMap<String, 
Integer>();
+       static Set<String> runIDsWithError = Collections.synchronizedSet(new 
HashSet<String>());
+
+       //      static Map<String, Integer> requestIDUnavailable = new 
HashMap<>();
+
+       @Override
+       public DiscoveryServiceAsyncRunStatus getStatus(String runId) {
+               String details = "Run like the wind";
+               DiscoveryServiceAsyncRunStatus.State state = 
DiscoveryServiceAsyncRunStatus.State.RUNNING;
+               synchronized (lock) {
+                       Integer i = runIDsRunning.get(runId);
+                       Assert.assertNotNull(i);
+                       if (i.intValue() == 0) {
+                               if (runIDsWithError.contains(runId)) {
+                                       state = 
DiscoveryServiceAsyncRunStatus.State.ERROR;
+                                       details = "This was a mistake";
+                               } else {
+                                       state = 
DiscoveryServiceAsyncRunStatus.State.FINISHED;
+                                       details = "Finish him!";
+                               }
+                       } else {
+                               runIDsRunning.put(runId, i - 1);
+                       }
+               }
+
+               DiscoveryServiceAsyncRunStatus status = new 
DiscoveryServiceAsyncRunStatus();
+               status.setRunId(runId);
+               status.setDetails(details);
+               status.setState(state);
+               logger.info("TestAsyncDiscoveryService1.getStatus returns: " + 
JSONUtils.lazyJSONSerializer(status));
+
+               return status;
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java
new file mode 100755
index 0000000..bd2f1a6
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.discoveryservice;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService;
+import 
org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncRunStatus;
+import 
org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+
+public class TestAsyncDiscoveryServiceWritingAnnotations1 extends 
DiscoveryServiceBase implements AsyncDiscoveryService {
+
+       static Logger logger = ODFTestLogger.get();
+
+       static Map<String, MyThread> id2Thread = 
Collections.synchronizedMap(new HashMap<String, MyThread>());
+
+       class MyThread extends Thread {
+
+               String errorMessage = null;
+               String correlationId;
+               MetaDataObjectReference dataSetRef;
+
+               public MyThread(MetaDataObjectReference dataSetRef, String 
correlationId) {
+                       super();
+                       this.dataSetRef = dataSetRef;
+                       this.correlationId = correlationId;
+               }
+
+               @Override
+               public void run() {
+                       this.errorMessage = 
TestSyncDiscoveryServiceWritingAnnotations1.createAnnotations(dataSetRef, 
correlationId, metadataStore, annotationStore);
+               }
+
+       }
+
+       @Override
+       public DiscoveryServiceAsyncStartResponse 
startAnalysis(DiscoveryServiceRequest request) {
+               DiscoveryServiceAsyncStartResponse response = new 
DiscoveryServiceAsyncStartResponse();
+               MetaDataObjectReference dataSetRef = 
request.getDataSetContainer().getDataSet().getReference();
+
+               String newRunID = "RunId-" + this.getClass().getSimpleName() + 
"-" + UUID.randomUUID().toString();
+               MyThread t = new MyThread(dataSetRef, (String) 
request.getAdditionalProperties().get(TestSyncDiscoveryServiceWritingAnnotations1.REQUEST_PROPERTY_CORRELATION_ID));
+               t.start();
+               id2Thread.put(newRunID, t);
+               response.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+               response.setRunId(newRunID);
+               response.setDetails("Thread started");
+               logger.info("Analysis writing annotations has started");
+
+               return response;
+       }
+
+       @Override
+       public DiscoveryServiceAsyncRunStatus getStatus(String runId) {
+               DiscoveryServiceAsyncRunStatus status = new 
DiscoveryServiceAsyncRunStatus();
+
+               MyThread t = id2Thread.get(runId);
+               status.setRunId(runId);
+               if (t == null) {
+                       
status.setState(DiscoveryServiceAsyncRunStatus.State.NOT_FOUND);
+               } else {
+                       java.lang.Thread.State ts = t.getState();
+                       if (!ts.equals(java.lang.Thread.State.TERMINATED)) {
+                               
status.setState(DiscoveryServiceAsyncRunStatus.State.RUNNING);
+                       } else {
+                               if (t.errorMessage != null) {
+                                       
status.setState(DiscoveryServiceAsyncRunStatus.State.ERROR);
+                                       status.setDetails(t.errorMessage);
+                               } else {
+                                       
status.setState(DiscoveryServiceAsyncRunStatus.State.FINISHED);
+                                       status.setDetails("All went fine");
+                               }
+                       }
+               }
+               logger.info("Status of analysis with annotations: " + 
status.getState() + ", " + status.getDetails());
+               return status;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java
new file mode 100755
index 0000000..9ea92f3
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.discoveryservice;
+
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import 
org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+
+public class TestSyncDiscoveryService1 extends DiscoveryServiceBase implements 
SyncDiscoveryService {
+       static int unavailableCounter = 0;
+
+       Logger logger = ODFTestLogger.get();
+
+       @Override
+       public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest 
request) {
+               try {
+                       DiscoveryServiceResponse.ResponseCode code = 
DiscoveryServiceResponse.ResponseCode.TEMPORARILY_UNAVAILABLE;
+                       String details = "Cannot answer right now 
synchronously";
+                       if (unavailableCounter % 2 == 0) {
+                               code = DiscoveryServiceResponse.ResponseCode.OK;
+                               details = "Everything's peachy and synchronous";
+                       }
+                       unavailableCounter++;
+                       DiscoveryServiceSyncResponse response = new 
DiscoveryServiceSyncResponse();
+                       response.setDetails(details);
+                       response.setCode(code);
+                       if (code == DiscoveryServiceResponse.ResponseCode.OK) {
+                               String dataSetId = 
request.getDataSetContainer().getDataSet().getReference().getId();
+                               if (dataSetId.startsWith("error")) {
+                                       
response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+                                       response.setDetails("Something went 
synchronously wrong!");
+                               } else {
+                                       response.setDetails("All is 
synchronously fine!");
+                               }
+                               
TestAsyncDiscoveryService1.checkUserAndAdditionalProperties(request);
+                       }
+                       logger.info(this.getClass().getSimpleName() + " service 
returned with code: " + response.getCode());
+                       return response;
+               } catch (Throwable t) {
+                       t.printStackTrace();
+                       return null;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java
new file mode 100755
index 0000000..62c7bf6
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.discoveryservice;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import 
org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.wink.json4j.JSONObject;
+import org.junit.Assert;
+
+import org.apache.atlas.odf.api.metadata.models.CachedMetadataStore;
+import org.apache.atlas.odf.api.metadata.models.DataSet;
+import org.apache.atlas.odf.api.metadata.models.MetaDataCache;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.RelationalDataSet;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+
+public class TestSyncDiscoveryServiceWritingAnnotations1 extends 
DiscoveryServiceBase implements SyncDiscoveryService {
+
+       static Logger logger = 
Logger.getLogger(TestSyncDiscoveryServiceWritingAnnotations1.class.getName());
+
+       public static String checkMetaDataCache(DiscoveryServiceRequest 
request) {
+               logger.info("Checking metadata cache");
+               MetaDataObject mdo = request.getDataSetContainer().getDataSet();
+               MetaDataCache cache = 
request.getDataSetContainer().getMetaDataCache();
+               if (cache == null) {
+                       return null;
+               }
+               CachedMetadataStore cacheReader = new 
CachedMetadataStore(cache);
+
+               if (mdo instanceof RelationalDataSet) {
+                       logger.info("Checking metadata cache for columns...");
+                       RelationalDataSet rds = (RelationalDataSet) mdo;
+                       Set<MetaDataObjectReference> cachedColumns = new 
HashSet<>();
+                       Set<MetaDataObjectReference> actualColumns = new 
HashSet<>();
+                       for (MetaDataObject col : cacheReader.getColumns(rds)) {
+                               cachedColumns.add(col.getReference());
+                       }
+                       MetadataStore mds = new 
ODFFactory().create().getMetadataStore();
+                       for (MetaDataObject col : mds.getColumns(rds)) {
+                               actualColumns.add(col.getReference());
+                       }
+                       Assert.assertTrue("Columns missing from metadata 
cache.", cachedColumns.containsAll(actualColumns));
+                       Assert.assertTrue("Too many columns in metadata 
cache.", actualColumns.containsAll(cachedColumns));
+               }
+               return null;
+       }
+
+       @Override
+       public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest 
request) {
+               logger.info("Analysis started on sync test service with 
annotations ");
+               String errorMessage = createAnnotations( //
+                               
request.getDataSetContainer().getDataSet().getReference(), //
+                               (String) 
request.getAdditionalProperties().get(REQUEST_PROPERTY_CORRELATION_ID), //
+                               metadataStore, //
+                               annotationStore);
+               if (errorMessage == null) {
+                       errorMessage = checkMetaDataCache(request);
+               }
+               DiscoveryServiceSyncResponse resp = new 
DiscoveryServiceSyncResponse();
+               if (errorMessage == null) {
+                       resp.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+                       resp.setDetails("Annotations created successfully");
+               } else {
+                       
resp.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+                       resp.setDetails(errorMessage);
+               }
+               logger.info("Analysis finished on sync test service with 
annotations ");
+
+               return resp;
+       }
+
+       public static final String REQUEST_PROPERTY_CORRELATION_ID = 
"REQUEST_PROPERTY_CORRELATION_ID";
+
+       static final String ANNOTATION_TYPE = "AnnotationType-" + 
TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName();
+       static final String JSON_ATTRIBUTE = "Attribute-" + 
TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName();
+       static final String JSON_VALUE = "Value-" + 
TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName();
+
+       public static int getNumberOfAnnotations() {
+               return 3;
+       }
+
+       public static String[] getPropsOfNthAnnotation(int i) {
+               return new String[] { ANNOTATION_TYPE + i, JSON_ATTRIBUTE + i, 
JSON_VALUE + i };
+       }
+
+       public static String createAnnotations(MetaDataObjectReference 
dataSetRef, String correlationId, MetadataStore mds, AnnotationStore as) {
+               try {
+                       
TestSyncDiscoveryServiceWritingAnnotations1.logger.info("Analysis will run on 
data set ref: " + dataSetRef);
+                       MetaDataObject dataSet = mds.retrieve(dataSetRef);
+
+                       String errorMessage = null;
+                       if (dataSet == null) {
+                               errorMessage = "Data set with id " + dataSetRef 
+ " could not be retrieved";
+                               
TestSyncDiscoveryServiceWritingAnnotations1.logger.severe(errorMessage);
+                               return errorMessage;
+                       }
+
+                       if (!(dataSet instanceof DataSet)) {
+                               errorMessage = "Object with id " + dataSetRef + 
" is not a data set";
+                               
TestSyncDiscoveryServiceWritingAnnotations1.logger.severe(errorMessage);
+                               return errorMessage;
+                       }
+
+                       // add some annotations
+                       for (int i = 0; i < getNumberOfAnnotations(); i++) {
+                               String[] annotValues = 
getPropsOfNthAnnotation(i);
+                               ProfilingAnnotation annotation1 = new 
ProfilingAnnotation();
+                               annotation1.setProfiledObject(dataSetRef);
+                               annotation1.setAnnotationType(annotValues[0]);
+                               JSONObject jo1 = new JSONObject();
+                               jo1.put(annotValues[1], annotValues[2]);
+                               jo1.put(REQUEST_PROPERTY_CORRELATION_ID, 
correlationId);
+                               annotation1.setJsonProperties(jo1.write());
+
+// PG: dynamic type creation disabled (types are already created statically)
+//                             
mds.createAnnotationTypesFromPrototypes(Collections.singletonList(annotation1));
+                               MetaDataObjectReference resultRef1 = 
as.store(annotation1);
+                               if (resultRef1 == null) {
+                                       throw new RuntimeException("Annotation 
object " + i + " could not be created");
+                               }
+                       }
+
+                       
TestSyncDiscoveryServiceWritingAnnotations1.logger.info("Discovery service " + 
TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName() + "created 
annotations successfully");
+               } catch (Throwable exc) {
+                       exc.printStackTrace();
+                       
TestSyncDiscoveryServiceWritingAnnotations1.logger.log(Level.WARNING, 
TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName() + " has 
failed", exc);
+                       return "Failed: " + Utils.getExceptionAsString(exc);
+               }
+               return null;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java
new file mode 100755
index 0000000..2e6d012
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.engine;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.engine.ODFVersion;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+
+public class ODFVersionTest extends TimerTestBase {
+       @Test
+       public void testVersion() {
+               ODFVersion version = new 
ODFFactory().create().getEngineManager().getVersion();
+               Assert.assertNotNull(version);
+               Assert.assertTrue(version.getVersion().startsWith("1.2.0-"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java
new file mode 100755
index 0000000..465eb5c
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.engine;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.engine.EngineManager;
+import org.apache.atlas.odf.api.engine.ODFEngineOptions;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest;
+
+public class ShutdownTest extends ODFTestBase {
+
+       private void runAndTestThreads() throws Exception {
+               ODFAPITest.runRequestAndCheckResult("successID", 
State.FINISHED, -1);
+               ThreadManager tm = new 
ODFInternalFactory().create(ThreadManager.class);
+               int numThreads = tm.getNumberOfRunningThreads();
+               log.info("--- Number of running threads: " + numThreads);
+               Assert.assertTrue(numThreads >= 3);             
+       }
+
+       @Test
+       public void testShutdown() throws Exception {
+
+               log.info("--- Running some request before shutdown...");
+               runAndTestThreads();
+
+               ThreadManager tm = new 
ODFInternalFactory().create(ThreadManager.class);
+               log.info("--- Number of threads before shutdown: " + 
tm.getNumberOfRunningThreads());
+
+               EngineManager engineManager = new 
ODFFactory().create().getEngineManager();
+               ODFEngineOptions options = new ODFEngineOptions();
+               options.setRestart(false);
+               int numThreads = tm.getNumberOfRunningThreads();
+               log.info("--- Number of threads before restart: " + numThreads);
+
+               engineManager.shutdown(options);
+               log.info("--- Shutdown requested...");
+               int maxWait = 60;
+               int waitCnt = 0;
+               log.info("--- Shutdown requested, waiting for max " + maxWait + 
" seconds");
+               while (tm.getNumberOfRunningThreads() > 0 && waitCnt < maxWait) 
{
+                       waitCnt++;
+                       Thread.sleep(1000);
+               }
+               log.info("--- Shutdown should be done by now, waited for " + 
waitCnt + " threads: " + tm.getNumberOfRunningThreads());
+               Assert.assertNotEquals(waitCnt, maxWait);
+
+       //      log.info("--- Starting ODF again....");
+
+       //      ODFInitializer.start();
+               log.info("--- Rerunning request after shutdown...");
+               runAndTestThreads();
+
+               int nrOfThreads = tm.getNumberOfRunningThreads();
+               options.setRestart(true);
+               engineManager.shutdown(options);
+               maxWait = nrOfThreads * 2;
+               waitCnt = 0;
+               log.info("--- Restart requested..., wait for a maximum of " + 
(nrOfThreads * 2500) + " ms");
+               while (tm.getNumberOfRunningThreads() > 0 && waitCnt < maxWait) 
{
+                       waitCnt++;
+                       Thread.sleep(1000);
+               }
+               log.info("--- Restart should be done by now");
+               Thread.sleep(5000);
+               numThreads = tm.getNumberOfRunningThreads();
+               log.info("--- Number of threads after restart: " + numThreads);
+               Assert.assertTrue(numThreads > 2);
+               log.info("--- testShutdown finished");
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java
new file mode 100755
index 0000000..c2be180
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.engine.MessagingStatus;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage;
+import org.apache.atlas.odf.core.controlcenter.AdminQueueProcessor;
+import org.apache.atlas.odf.core.controlcenter.ConfigChangeQueueProcessor;
+import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore;
+import org.apache.atlas.odf.core.controlcenter.DiscoveryServiceStarter;
+import org.apache.atlas.odf.core.controlcenter.ExecutorServiceFactory;
+import org.apache.atlas.odf.core.controlcenter.ODFRunnable;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import 
org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult;
+import org.apache.atlas.odf.core.controlcenter.TrackerUtil;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class MockQueueManager implements DiscoveryServiceQueueManager {
+
+       static Logger logger = 
Logger.getLogger(MockQueueManager.class.getName());
+
+       static Object lock = new Object();
+
+       static List<AdminMessage> adminQueue = Collections.synchronizedList(new 
ArrayList<AdminMessage>());
+       static List<StatusQueueEntry> statusQueue = 
Collections.synchronizedList(new ArrayList<StatusQueueEntry>());
+       static Map<String, List<AnalysisRequestTracker>> runtimeQueues = new 
HashMap<>();
+
+       ThreadManager threadManager;
+
+       public MockQueueManager() {
+               ODFInternalFactory factory = new ODFInternalFactory();
+               ExecutorServiceFactory esf = 
factory.create(ExecutorServiceFactory.class);
+               threadManager = factory.create(ThreadManager.class);
+               threadManager.setExecutorService(esf.createExecutorService());
+               //initialize();
+       }
+
+       @Override
+       public void start() throws TimeoutException {
+               logger.info("Initializing MockQueueManager");
+               List<ThreadStartupResult> threads = new 
ArrayList<ThreadStartupResult>();
+               ThreadStartupResult startUnmanagedThread = 
this.threadManager.startUnmanagedThread("MOCKADMIN", 
createQueueListener("Admin", adminQueue, new AdminQueueProcessor(), false));
+               boolean threadCreated = 
startUnmanagedThread.isNewThreadCreated();
+               threads.add(startUnmanagedThread);
+               startUnmanagedThread = 
this.threadManager.startUnmanagedThread("MOCKADMINCONFIGCHANGE",
+                               createQueueListener("AdminConfig", adminQueue, 
new ConfigChangeQueueProcessor(), false));
+               threadCreated |= startUnmanagedThread.isNewThreadCreated();
+               threads.add(startUnmanagedThread);
+               startUnmanagedThread = 
this.threadManager.startUnmanagedThread("MOCKSTATUSSTORE",
+                               createQueueListener("StatusStore", statusQueue, 
new DefaultStatusQueueStore.StatusQueueProcessor(), true));
+               threadCreated |= startUnmanagedThread
+                               .isNewThreadCreated();
+               threads.add(startUnmanagedThread);
+
+               logger.info("New thread created: " + threadCreated);
+               if (threadCreated) {
+                       try {
+                               
this.threadManager.waitForThreadsToBeReady(5000, threads);
+                               logger.info("All threads ready");
+                       } catch (TimeoutException e) {
+                               final String message = "Not all thrads were 
created on time";
+                               logger.warning(message);
+                       }
+               }
+       }
+
+       @Override
+       public void stop() {
+               threadManager.shutdownThreads(Arrays.asList("MOCKADMIN", 
"MOCKADMINCONFIGCHANGE", "MOCKSTATUSSTORE"));
+       }
+
+       <T> T cloneObject(T obj) {
+               try {
+                       return JSONUtils.cloneJSONObject(obj);
+               } catch (JSONException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       @Override
+       public void enqueue(AnalysisRequestTracker tracker) {
+               tracker = cloneObject(tracker);
+               DiscoveryServiceRequest dsRequest = 
TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
+               if (dsRequest == null) {
+                       throw new RuntimeException("Tracker is finished, should 
not be enqueued");
+               }
+               String dsID = dsRequest.getDiscoveryServiceId();
+               dsRequest.setPutOnRequestQueue(System.currentTimeMillis());
+               synchronized (lock) {
+                       ServiceRuntime runtime = 
ServiceRuntimes.getRuntimeForDiscoveryService(dsID);
+                       if (runtime == null) {
+                               throw new 
RuntimeException(MessageFormat.format("Runtime of discovery service ''{0}'' 
does not exist", dsID));
+                       }
+                       String runtimeName = runtime.getName();
+                       List<AnalysisRequestTracker> mq = 
runtimeQueues.get(runtimeName);
+                       if (mq == null) {
+                               mq = Collections.synchronizedList(new 
ArrayList<AnalysisRequestTracker>());
+                               runtimeQueues.put(runtimeName, mq);
+                       }
+                       boolean started = 
this.threadManager.startUnmanagedThread("MOCK" + runtimeName, 
createQueueListener("Starter" + runtimeName, mq, new DiscoveryServiceStarter(), 
false))
+                                       .isNewThreadCreated();
+                       logger.info("New thread created for runtime " + 
runtimeName + ", started: " + started + ", current queue length: " + mq.size());
+                       mq.add(tracker);
+               }
+       }
+
+       static class MockQueueListener implements ODFRunnable {
+               String name; 
+               QueueMessageProcessor processor;
+               List<?> queue;
+               boolean cancelled = false;
+               ExecutorService service;
+               int index = 0;
+
+               public MockQueueListener(String name, List<?> q, 
QueueMessageProcessor qmp, boolean fromBeginning) {
+                       this.name = name;
+                       this.processor = qmp;
+                       this.queue = q;
+                       if (fromBeginning) {
+                               index = 0;
+                       } else {
+                               index = q.size();
+                       }
+               }
+
+               long WAITTIMEMS = 100;
+
+               boolean isValidIndex() {
+                       return index >= 0 && index < queue.size();
+               }
+
+               @Override
+               public void run() {
+                       logger.info("MockQueueManager thread " + name + " 
started");
+
+                       while (!cancelled) {
+                       //      logger.info("Queue consumer " + name + ": 
checking index " + index + " on queue of size " + queue.size());
+                               if (!isValidIndex()) {
+                                       try {
+                                               Thread.sleep(WAITTIMEMS);
+                                       } catch (InterruptedException e) {
+                                               e.printStackTrace();
+                                       }
+                               } else {
+                                       Object obj = queue.get(index);
+                                       String msg;
+                                       try {
+                                               msg = JSONUtils.toJSON(obj);
+                                       } catch (JSONException e) {
+                                               e.printStackTrace();
+                                               cancelled = true;
+                                               return;
+                                       }
+                                       this.processor.process(service, msg, 0, 
index);
+                                       logger.finest("MockQConsumer " + name + 
": Processed message: " + msg);
+                                       index++;
+                               }
+                       }
+                       logger.info("MockQueueManager thread finished");
+
+               }
+
+
+               @Override
+               public void setExecutorService(ExecutorService service) {
+                       this.service = service;
+               }
+
+               @Override
+               public void cancel() {
+                       cancelled = true;
+               }
+
+               @Override
+               public boolean isReady() {
+                       return true;
+               }
+
+       }
+
+       ODFRunnable createQueueListener(String name, List<?> queue, 
QueueMessageProcessor qmp, boolean fromBeginning) {
+               return new MockQueueListener(name, queue, qmp, fromBeginning);
+       }
+
+       @Override
+       public void enqueueInStatusQueue(StatusQueueEntry sqe) {
+               sqe = cloneObject(sqe);
+               statusQueue.add(sqe);
+       }
+
+       @Override
+       public void enqueueInAdminQueue(AdminMessage message) {
+               message = cloneObject(message);
+               adminQueue.add(message);
+       }
+
+       public static class MockMessagingStatus extends MessagingStatus {
+               String message;
+
+               public String getMessage() {
+                       return message;
+               }
+
+               public void setMessage(String message) {
+                       this.message = message;
+               }
+
+       }
+
+       @Override
+       public MessagingStatus getMessagingStatus() {
+               MockMessagingStatus mms = new MockMessagingStatus();
+               mms.setMessage("OK");
+               return mms;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java
new file mode 100755
index 0000000..f69513c
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.notification;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.notification.NotificationListener;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.notification.NotificationManager;
+import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest;
+
+public class NotificationManagerTest extends ODFTestBase {
+
+       @Test
+       public void testNotifications() throws Exception {
+               NotificationManager nm = new 
ODFInternalFactory().create(NotificationManager.class);
+               Assert.assertNotNull(nm);
+               log.info("Notification manager found " + 
nm.getClass().getName());
+               Assert.assertTrue(nm instanceof TestNotificationManager);
+               List<NotificationListener> listeners = nm.getListeners();
+               Assert.assertTrue(listeners.size() > 0);
+
+               OpenDiscoveryFramework odf = new ODFFactory().create();
+               List<String> dataSetIDs = 
Collections.singletonList("successID");
+               String id = ODFAPITest.runRequest(dataSetIDs, 
odf.getAnalysisManager());
+               ODFAPITest.waitForRequest(id, odf.getAnalysisManager());
+
+               int polls = 20;
+               boolean found = false;
+               boolean foundFinished = false;
+               do {
+                       // now check that trackers were found through the 
notification mechanism
+                       log.info("Checking that trackers were consumed, " + 
polls + " seconds left");
+                       List<AnalysisRequestTracker> trackers = new 
ArrayList<>(TestNotificationManager.receivedTrackers);
+                       log.info("Received trackers: " + trackers.size());
+                       for (AnalysisRequestTracker tracker : trackers) {
+                               String foundId = tracker.getRequest().getId();
+                               if (foundId.equals(id)) {
+                                       found = true;
+                                       if 
(tracker.getStatus().equals(STATUS.FINISHED)) {
+                                               foundFinished = true;
+                                       }
+                               }
+                       }
+                       polls--;
+                       Thread.sleep(1000);
+               } while (!found && !foundFinished && polls > 0);
+               Assert.assertTrue(found);
+               Assert.assertTrue(foundFinished);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java
new file mode 100755
index 0000000..80252d6
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.notification;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry;
+import org.apache.atlas.odf.core.notification.NotificationListener;
+import org.apache.atlas.odf.core.notification.NotificationManager;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+
+public class TestNotificationManager implements NotificationManager {
+
+       public static class TestListener1 implements NotificationListener {
+
+               @Override
+               public String getTopicName() {
+                       return "odf-status-topic";
+               }
+
+               @Override
+               public void onEvent(String event, OpenDiscoveryFramework odf) {
+                       try {
+                               StatusQueueEntry sqe = 
JSONUtils.fromJSON(event, StatusQueueEntry.class);
+                               AnalysisRequestTracker tracker = 
sqe.getAnalysisRequestTracker();
+                               if (tracker != null) {
+                                       receivedTrackers.add(tracker);          
                        
+                               }
+                       } catch (JSONException e) {
+                               throw new RuntimeException(e);
+                       }
+               }
+
+               @Override
+               public String getName() {
+                       return this.getClass().getName();
+               }
+
+       }
+
+       public static List<AnalysisRequestTracker> receivedTrackers = 
Collections.synchronizedList(new ArrayList<AnalysisRequestTracker>());
+
+       @Override
+       public List<NotificationListener> getListeners() {
+               List<NotificationListener> result = new ArrayList<>();
+               result.add(new TestListener1());
+               return result;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java
new file mode 100755
index 0000000..8a8d9a8
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.runtime;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest;
+
+public class RuntimeExtensionTest extends ODFTestBase {
+
+       static final String SERVICE_ON_TEST_RUNTIME = "testruntimeservice";
+
+       List<String> getNames(List<ServiceRuntime> rts) {
+               List<String> result = new ArrayList<>();
+               for (ServiceRuntime rt : rts) {
+                       result.add(rt.getName());
+               }
+               return result;
+       }
+
+       @Test
+       public void testActiveRuntimes() {
+               List<String> allNames = 
getNames(ServiceRuntimes.getAllRuntimes());
+               
Assert.assertTrue(allNames.contains(TestServiceRuntime.TESTSERVICERUNTIME_NAME));
+
+               List<String> activeNames = 
getNames(ServiceRuntimes.getActiveRuntimes());
+               
Assert.assertTrue(activeNames.contains(TestServiceRuntime.TESTSERVICERUNTIME_NAME));
+       }
+
+       @Test
+       public void testRuntimeForNewService() {
+               ServiceRuntime rt = 
ServiceRuntimes.getRuntimeForDiscoveryService(SERVICE_ON_TEST_RUNTIME);
+               Assert.assertNotNull(rt);
+               Assert.assertEquals(TestServiceRuntime.TESTSERVICERUNTIME_NAME, 
rt.getName());
+       }
+
+       static Object lock = new Object();
+
+       @Test
+       public void testRuntimeExtensionSimple() throws Exception {
+               synchronized (lock) {
+                       OpenDiscoveryFramework odf = new ODFFactory().create();
+                       TestServiceRuntime.runtimeBlocked = false;
+                       AnalysisRequest request = 
ODFAPITest.createAnalysisRequest(Collections.singletonList(ODFAPITest.DUMMY_SUCCESS_ID));
+                       
request.setDiscoveryServiceSequence(Collections.singletonList(SERVICE_ON_TEST_RUNTIME));
+                       log.info("Starting service for test runtime");
+                       AnalysisResponse resp = 
odf.getAnalysisManager().runAnalysis(request);
+                       String requestId = resp.getId();
+                       Assert.assertTrue(ODFAPITest.waitForRequest(requestId, 
odf.getAnalysisManager(), 40, State.FINISHED));
+                       
Assert.assertTrue(TestServiceRuntime.requests.contains(requestId));
+                       log.info("testRuntimeExtensionSimple finished");
+
+                       // block runtime again to restore state before testcase
+                       TestServiceRuntime.runtimeBlocked = true;
+                       Thread.sleep(5000);
+               }
+       }
+
+       @Test
+       public void testBlockedRuntimeExtension() throws Exception {
+               synchronized (lock) {
+                       OpenDiscoveryFramework odf = new ODFFactory().create();
+                       TestServiceRuntime.runtimeBlocked = true;
+                       AnalysisRequest request = 
ODFAPITest.createAnalysisRequest(Collections.singletonList(ODFAPITest.DUMMY_SUCCESS_ID));
+                       
request.setDiscoveryServiceSequence(Collections.singletonList(SERVICE_ON_TEST_RUNTIME));
+                       log.info("Starting service for test runtime");
+                       AnalysisResponse resp = 
odf.getAnalysisManager().runAnalysis(request);
+                       String requestId = resp.getId();
+                       Assert.assertFalse(resp.isInvalidRequest());
+                       log.info("Checking that service is not called");
+                       for (int i = 0; i < 5; i++) {
+                               
Assert.assertFalse(TestServiceRuntime.requests.contains(requestId));
+                               Thread.sleep(1000);
+                       }
+                       log.info("Unblocking runtime...");
+                       TestServiceRuntime.runtimeBlocked = false;
+                       Thread.sleep(5000); // give service time to start 
consumption
+                       log.info("Checking that request has finished");
+                       Assert.assertTrue(ODFAPITest.waitForRequest(requestId, 
odf.getAnalysisManager(), 40, State.FINISHED));
+                       log.info("Checking that service was called");
+                       
Assert.assertTrue(TestServiceRuntime.requests.contains(requestId));
+                       log.info("testBlockedRuntimeExtension finished");
+                       
+                       // block runtime again to restore state before testcase
+                       TestServiceRuntime.runtimeBlocked = true;
+                       Thread.sleep(5000);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java
new file mode 100755
index 0000000..d16e10a
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.runtime;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.SyncDiscoveryServiceBase;
+import 
org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+
+public class TestServiceRuntime implements ServiceRuntime {
+
+       static Logger logger = ODFTestLogger.get();
+
+       public static final String TESTSERVICERUNTIME_NAME = 
"TestServiceRuntime";
+       
+       public static boolean runtimeBlocked = true;
+
+       @Override
+       public String getName() {
+               return TESTSERVICERUNTIME_NAME;
+       }
+
+       @Override
+       public long getWaitTimeUntilAvailable() {
+               if (runtimeBlocked) {
+                       return 1000;
+               }
+               return 0;
+       }
+
+       public static Set<String> requests = new HashSet<>();
+
+       public static class DSProxy extends SyncDiscoveryServiceBase {
+
+               @Override
+               public DiscoveryServiceSyncResponse 
runAnalysis(DiscoveryServiceRequest request) {
+                       logger.info("Running test runtime service");
+                       requests.add(request.getOdfRequestId());
+                       DiscoveryServiceSyncResponse resp = new 
DiscoveryServiceSyncResponse();
+                       resp.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+                       resp.setDetails("Test success");
+                       return resp;
+               }
+       }
+
+       @Override
+       public DiscoveryService 
createDiscoveryServiceProxy(DiscoveryServiceProperties props) {
+               return new DSProxy();
+       }
+
+       @Override
+       public String getDescription() {
+               return "TestServiceRuntime description";
+       }
+
+       @Override
+       public void validate(DiscoveryServiceProperties props) throws 
ValidationException {
+       }
+
+}


Reply via email to