http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ODFRunnable.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ODFRunnable.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ODFRunnable.java
new file mode 100755
index 0000000..f999ecf
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ODFRunnable.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.ExecutorService;
+
+public interface ODFRunnable extends Runnable {
+
+       void setExecutorService(ExecutorService service);
+       
+       void cancel();
+       
+       // return true if the runnable is likely to be ready to receive data
+       boolean isReady();
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/QueueMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/QueueMessageProcessor.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/QueueMessageProcessor.java
new file mode 100755
index 0000000..e6642c5
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/QueueMessageProcessor.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.ExecutorService;
+
+
+public interface QueueMessageProcessor {
+
+       /**
+        * callback to process the message taken from the queue.
+        * 
+        * @param executorService
+        * @param msg The message to be processed
+        * @param partition The kafka topic partition this message was read from
+        * @param msgOffset The offset of this particular message on this kafka 
partition
+        * @return
+        */
+       void process(ExecutorService executorService, String msg, int 
partition, long msgOffset);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntime.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntime.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntime.java
new file mode 100755
index 0000000..da06dd2
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntime.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+
+public interface ServiceRuntime {
+       
+       String getName();
+       
+       /**
+        * Check if the runtime is currently available for processing.
+        * Returns <= 0 if the runtime is available immediately. A number > 0
+        * indicates how many seconds to wait until retrying.
+        * 
+        * Note: If this method returns > 0 the Kafka consumer will be shut 
down and only be 
+        * started again when it returns <= 0. Shutting down and restarting the 
consumer is
+        * rather costly so this should only be done if the runtime won't be 
accepting requests
+        * for a foreseeable period of time.
+        */
+       long getWaitTimeUntilAvailable();
+
+       DiscoveryService createDiscoveryServiceProxy(DiscoveryServiceProperties 
props);
+
+       String getDescription();
+       
+       void validate(DiscoveryServiceProperties props) throws 
ValidationException;
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntimes.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntimes.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntimes.java
new file mode 100755
index 0000000..a867580
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ServiceRuntimes.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.net.URL;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+import org.apache.atlas.odf.api.engine.ServiceRuntimeInfo;
+import org.apache.atlas.odf.api.engine.ServiceRuntimesInfo;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+
+public class ServiceRuntimes {
+
+       static Logger logger = 
Logger.getLogger(ServiceRuntimes.class.getName());
+
+       static List<ServiceRuntime> getRuntimeExtensions() throws IOException {
+               ClassLoader cl = ServiceRuntimes.class.getClassLoader();
+               Enumeration<URL> services = 
cl.getResources("META-INF/odf/odf-runtimes.txt");
+               List<ServiceRuntime> result = new ArrayList<>();
+               while (services.hasMoreElements()) {
+                       URL url = services.nextElement();
+                       InputStream is = url.openStream();
+                       InputStreamReader isr = new InputStreamReader(is, 
"UTF-8");
+                       LineNumberReader lnr = new LineNumberReader(isr);
+                       String line = null;
+                       while ((line = lnr.readLine()) != null) {
+                               line = line.trim();
+                               logger.log(Level.INFO,  "Loading runtime 
extension ''{0}''", line);
+                               try {
+                                       @SuppressWarnings("unchecked")
+                                       Class<ServiceRuntime> clazz = 
(Class<ServiceRuntime>) cl.loadClass(line);
+                                       ServiceRuntime sr = clazz.newInstance();
+                                       result.add(sr);
+                               } catch (InstantiationException | 
IllegalAccessException | ClassNotFoundException e) {
+                                       logger.log(Level.WARNING, 
MessageFormat.format("Runtime extension of class ''{0}'' could not be 
instantiated", line), e);
+                               } 
+                       }
+               }
+               logger.log(Level.INFO, "Number of classpath services found: 
{0}", result.size());
+               return result;
+       }
+       
+       static {
+               List<ServiceRuntime> allRuntimes = new 
ArrayList<>(Arrays.asList( //
+                               new HealthCheckServiceRuntime(), //
+                               new JavaServiceRuntime(), //
+                               new SparkServiceRuntime() //
+               ));
+               try {
+                       List<ServiceRuntime> runtimeExtensions = 
getRuntimeExtensions();
+                       allRuntimes.addAll(runtimeExtensions);
+               } catch (IOException e) {
+                       logger.log(Level.WARNING, "An exception occurred when 
loading runtime extensions, ignoring them", e);
+               }
+               runtimes = Collections.unmodifiableList(allRuntimes);
+       }
+
+       private static List<ServiceRuntime> runtimes;
+
+       public static List<ServiceRuntime> getActiveRuntimes() {
+               Environment env = new 
ODFInternalFactory().create(Environment.class);
+               List<String> activeRuntimeNames = env.getActiveRuntimeNames();
+               if (activeRuntimeNames == null) {
+                       return getAllRuntimes();
+               }
+               // always add health check runtime
+               Set<String> activeRuntimeNamesSet = new 
HashSet<>(activeRuntimeNames);
+               
activeRuntimeNamesSet.add(HealthCheckServiceRuntime.HEALTH_CHECK_RUNTIME_NAME);
+               List<ServiceRuntime> activeRuntimes = new ArrayList<>();
+               for (ServiceRuntime rt : runtimes) {
+                       if (activeRuntimeNamesSet.contains(rt.getName())) {
+                               activeRuntimes.add(rt);
+                       }
+               }
+               return activeRuntimes;
+       }
+
+       public static List<ServiceRuntime> getAllRuntimes() {
+               return runtimes;
+       }
+
+       public static ServiceRuntime 
getRuntimeForDiscoveryService(DiscoveryServiceProperties discoveryServiceProps) 
{
+               DiscoveryServiceEndpoint ep = 
discoveryServiceProps.getEndpoint();
+               for (ServiceRuntime runtime : getAllRuntimes()) {
+                       if (runtime.getName().equals(ep.getRuntimeName())) {
+                               return runtime;
+                       }
+               }
+               return null;
+       }
+
+       public static ServiceRuntime getRuntimeForDiscoveryService(String 
discoveryServiceId) {
+               // special check because the healch check runtime is not part 
of the configuration
+               if 
(discoveryServiceId.startsWith(ControlCenter.HEALTH_TEST_DISCOVERY_SERVICE_ID)) 
{
+                       return new HealthCheckServiceRuntime();
+               }
+               DiscoveryServiceManager dsm = new 
ODFInternalFactory().create(DiscoveryServiceManager.class);
+               try {
+                       DiscoveryServiceProperties props = 
dsm.getDiscoveryServiceProperties(discoveryServiceId);
+                       return getRuntimeForDiscoveryService(props);
+               } catch (ServiceNotFoundException e) {
+                       return null;
+               }
+       }
+
+       public static ServiceRuntimesInfo getRuntimesInfo(List<ServiceRuntime> 
runtimes) {
+               List<ServiceRuntimeInfo> rts = new ArrayList<>();
+               for (ServiceRuntime rt : runtimes) {
+                       ServiceRuntimeInfo sri = new ServiceRuntimeInfo();
+                       sri.setName(rt.getName());
+                       sri.setDescription(rt.getDescription());
+                       rts.add(sri);
+               }
+               ServiceRuntimesInfo result = new ServiceRuntimesInfo();
+               result.setRuntimes(rts);
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkDiscoveryServiceProxy.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkDiscoveryServiceProxy.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkDiscoveryServiceProxy.java
new file mode 100755
index 0000000..6dc1fd0
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkDiscoveryServiceProxy.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.text.MessageFormat;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import 
org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.spark.SparkServiceExecutor;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint;
+import org.apache.atlas.odf.core.Utils;
+
+/**
+ * Proxy for calling any type of Spark discovery services.
+ * 
+ *
+ */
+
+public class SparkDiscoveryServiceProxy implements SyncDiscoveryService {
+       Logger logger = 
Logger.getLogger(SparkDiscoveryServiceProxy.class.getName());
+
+       protected MetadataStore metadataStore;
+       protected AnnotationStore annotationStore;
+       protected ExecutorService executorService;
+       private DiscoveryServiceProperties dsri;
+
+       public SparkDiscoveryServiceProxy(DiscoveryServiceProperties dsri) {
+               this.dsri = dsri;
+       }
+
+       @Override
+       public void setExecutorService(ExecutorService executorService) {
+               this.executorService = executorService;
+       }
+
+       @Override
+       public void setMetadataStore(MetadataStore metadataStore) {
+               this.metadataStore = metadataStore;
+       }
+
+       @Override
+       public DataSetCheckResult checkDataSet(DataSetContainer 
dataSetContainer) {
+               DataSetCheckResult checkResult = new DataSetCheckResult();
+               
checkResult.setDataAccess(DataSetCheckResult.DataAccess.NotPossible);
+               try {
+                       SparkServiceExecutor executor = new 
ODFInternalFactory().create(SparkServiceExecutor.class);
+                       checkResult = executor.checkDataSet(this.dsri, 
dataSetContainer);
+               } catch (Exception e) {
+                       logger.log(Level.WARNING,"Error running discovery 
service.", e);
+                       checkResult.setDetails(Utils.getExceptionAsString(e));
+               }
+               return checkResult;
+       }
+
+       @Override
+       public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest 
request) {
+               logger.log(Level.INFO,MessageFormat.format("Starting Spark 
discovery service ''{0}'', id {1}.", new Object[]{ dsri.getName(), dsri.getId() 
}));
+               DiscoveryServiceSyncResponse response = new 
DiscoveryServiceSyncResponse();
+               DiscoveryServiceSparkEndpoint endpoint;
+               try {
+                       endpoint = JSONUtils.convert(dsri.getEndpoint(),  
DiscoveryServiceSparkEndpoint.class);
+               } catch (JSONException e1) {
+                       throw new RuntimeException(e1);
+               }
+               if ((endpoint.getJar() == null) || 
(endpoint.getJar().isEmpty())) {
+                       response.setDetails("No jar file  was provided that 
implements the Spark application.");
+               } else try {
+                       SparkServiceExecutor executor = new 
ODFInternalFactory().create(SparkServiceExecutor.class);
+                       response = executor.runAnalysis(this.dsri, request);
+                       logger.log(Level.FINER, "Spark discovery service 
response: " + response.toString());
+                       logger.log(Level.INFO,"Spark discover service 
finished.");
+                       return response;
+               } catch (Exception e) {
+                       logger.log(Level.WARNING,"Error running Spark 
application: ", e);
+                       response.setDetails(Utils.getExceptionAsString(e));
+               }
+               
response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+               return response;
+       }
+
+       @Override
+       public void setAnnotationStore(AnnotationStore annotationStore) {
+               this.annotationStore = annotationStore;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkServiceRuntime.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkServiceRuntime.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkServiceRuntime.java
new file mode 100755
index 0000000..91056b3
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/SparkServiceRuntime.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class SparkServiceRuntime implements ServiceRuntime {
+
+       public static final String SPARK_RUNTIME_NAME = "Spark";
+       
+       @Override
+       public String getName() {
+               return SPARK_RUNTIME_NAME;
+       }
+
+       @Override
+       public long getWaitTimeUntilAvailable() {
+               return 0;
+       }
+
+       @Override
+       public DiscoveryService 
createDiscoveryServiceProxy(DiscoveryServiceProperties props) {
+               return new SparkDiscoveryServiceProxy(props);
+       }
+
+       @Override
+       public String getDescription() {
+               return "The default Spark runtime";
+       }
+
+       @Override
+       public void validate(DiscoveryServiceProperties props) throws 
ValidationException {
+               try {
+                       JSONUtils.convert(props.getEndpoint(),  
DiscoveryServiceSparkEndpoint.class);
+               } catch (JSONException e1) {
+                       throw new ValidationException("Endpoint definition for 
Spark service is not correct: " + Utils.getExceptionAsString(e1));
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/StatusQueueEntry.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/StatusQueueEntry.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/StatusQueueEntry.java
new file mode 100755
index 0000000..206a6d0
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/StatusQueueEntry.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+
+// JSON
+public class StatusQueueEntry {
+
+       private Annotation annotation;
+       private AnalysisRequestTracker analysisRequestTracker;
+
+       public Annotation getAnnotation() {
+               return annotation;
+       }
+
+       public void setAnnotation(Annotation annotation) {
+               this.annotation = annotation;
+       }
+
+       public AnalysisRequestTracker getAnalysisRequestTracker() {
+               return analysisRequestTracker;
+       }
+
+       public void setAnalysisRequestTracker(AnalysisRequestTracker 
analysisRequestTracker) {
+               this.analysisRequestTracker = analysisRequestTracker;
+       }
+
+       
+       public static String getRequestId(StatusQueueEntry seq) {
+               if (seq.getAnnotation() != null) {
+                       return seq.getAnnotation().getAnalysisRun();
+               } else if (seq.getAnalysisRequestTracker() != null) {
+                       return 
seq.getAnalysisRequestTracker().getRequest().getId();
+               }
+               return null;
+       }
+
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ThreadManager.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ThreadManager.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ThreadManager.java
new file mode 100755
index 0000000..33dba10
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ThreadManager.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.atlas.odf.api.engine.ThreadStatus;
+
+public interface ThreadManager {
+
+       void waitForThreadsToBeReady(long waitingLimitMs, 
List<ThreadStartupResult> startedThreads) throws TimeoutException;
+
+       ThreadStartupResult startUnmanagedThread(String name, ODFRunnable 
runnable);
+       
+       ThreadStatus.ThreadState getStateOfUnmanagedThread(String name);
+       
+       ODFRunnable getRunnable(String name);
+       
+       void setExecutorService(ExecutorService executorService);
+       
+       void shutdownAllUnmanagedThreads();
+       
+       void shutdownThreads(List<String> names);
+       
+       int getNumberOfRunningThreads();
+
+       List<ThreadStatus> getThreadManagerStatus();
+
+       public abstract class ThreadStartupResult {
+
+               private String threadId;
+               private boolean newThreadCreated;
+
+               public ThreadStartupResult(String id) {
+                       this.threadId = id;
+               }
+
+               public String getThreadId() {
+                       return threadId;
+               }
+
+               public boolean isNewThreadCreated() {
+                       return newThreadCreated;
+               }
+
+               public void setNewThreadCreated(boolean newThreadCreated) {
+                       this.newThreadCreated = newThreadCreated;
+               }
+
+               public abstract boolean isReady();
+
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TrackerUtil.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TrackerUtil.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TrackerUtil.java
new file mode 100755
index 0000000..f1c7704
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TrackerUtil.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+
+public class TrackerUtil {
+       
+       /**
+        * @param tracker
+        * @return true if the first analysis of the tracker has not yet been 
started
+        */
+       public static boolean isAnalysisWaiting(AnalysisRequestTracker tracker) 
{
+               return tracker.getNextDiscoveryServiceRequest() == 0 && 
(tracker.getStatus() == STATUS.IN_DISCOVERY_SERVICE_QUEUE || 
tracker.getStatus() == STATUS.INITIALIZED); // || tracker.getStatus() == 
STATUS.DISCOVERY_SERVICE_RUNNING);
+       }
+       
+       public static boolean isCancellable(AnalysisRequestTracker tracker)  {
+               return (tracker.getStatus() == 
STATUS.IN_DISCOVERY_SERVICE_QUEUE || tracker.getStatus() == STATUS.INITIALIZED 
|| tracker.getStatus() == STATUS.DISCOVERY_SERVICE_RUNNING);
+       }
+
+       public static DiscoveryServiceRequest 
getCurrentDiscoveryServiceStartRequest(AnalysisRequestTracker tracker) {
+               int i = tracker.getNextDiscoveryServiceRequest();
+               List<DiscoveryServiceRequest> requests = 
tracker.getDiscoveryServiceRequests();
+               if (i >= 0 && i < requests.size()) {
+                       return requests.get(i);
+               }
+               return null;
+       }
+
+       public static DiscoveryServiceResponse 
getCurrentDiscoveryServiceStartResponse(AnalysisRequestTracker tracker) {
+               int i = tracker.getNextDiscoveryServiceRequest();
+               List<DiscoveryServiceResponse> responses = 
tracker.getDiscoveryServiceResponses();
+               if (responses == null || responses.isEmpty()) {
+                       return null;
+               }
+               if (i >= 0 && i < responses.size()) {
+                       return responses.get(i);
+               }
+               return null;
+       }
+
+       public static void moveToNextDiscoveryService(AnalysisRequestTracker 
tracker) {
+               int i = tracker.getNextDiscoveryServiceRequest();
+               List<DiscoveryServiceRequest> requests = 
tracker.getDiscoveryServiceRequests();
+               if (i >= 0 && i < requests.size()) {
+                       tracker.setNextDiscoveryServiceRequest(i+1);
+               }
+       }
+
+       public static void 
addDiscoveryServiceStartResponse(AnalysisRequestTracker tracker, 
DiscoveryServiceResponse response) {
+               List<DiscoveryServiceResponse> l = 
tracker.getDiscoveryServiceResponses();
+               if (l == null) {
+                       l = new ArrayList<DiscoveryServiceResponse>();
+                       tracker.setDiscoveryServiceResponses(l);
+               }
+               l.add(response);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionAsyncDiscoveryServiceProxy.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionAsyncDiscoveryServiceProxy.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionAsyncDiscoveryServiceProxy.java
new file mode 100755
index 0000000..1a3de04
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionAsyncDiscoveryServiceProxy.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService;
+import 
org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncRunStatus;
+import 
org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+
+public class TransactionAsyncDiscoveryServiceProxy implements 
AsyncDiscoveryService {
+
+       private AsyncDiscoveryService wrappedService;
+
+       public TransactionAsyncDiscoveryServiceProxy(AsyncDiscoveryService 
wrappedService) {
+               this.wrappedService = wrappedService;
+       }
+
+       public DiscoveryServiceAsyncStartResponse startAnalysis(final 
DiscoveryServiceRequest request) {
+               TransactionContextExecutor transactionContextExecutor = new 
ODFInternalFactory().create(TransactionContextExecutor.class);
+               try {
+                       return (DiscoveryServiceAsyncStartResponse) 
transactionContextExecutor.runInTransactionContext(new Callable<Object>() {
+
+                               @Override
+                               public Object call() throws Exception {
+                                       return 
wrappedService.startAnalysis(request);
+                               }
+                       });
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+       }
+
+       public DiscoveryServiceAsyncRunStatus getStatus(final String runId) {
+               TransactionContextExecutor transactionContextExecutor = new 
ODFInternalFactory().create(TransactionContextExecutor.class);
+               try {
+                       return (DiscoveryServiceAsyncRunStatus) 
transactionContextExecutor.runInTransactionContext(new Callable<Object>() {
+
+                               @Override
+                               public Object call() throws Exception {
+                                       return wrappedService.getStatus(runId);
+                               }
+                       });
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+       }
+
+       public void setExecutorService(ExecutorService executorService) {
+               wrappedService.setExecutorService(executorService);
+       }
+
+       public void setMetadataStore(MetadataStore metadataStore) {
+               wrappedService.setMetadataStore(metadataStore);
+       }
+
+       public void setAnnotationStore(AnnotationStore annotationStore) {
+               wrappedService.setAnnotationStore(annotationStore);
+       }
+
+       public DataSetCheckResult checkDataSet(final DataSetContainer 
dataSetContainer) {
+               TransactionContextExecutor transactionContextExecutor = new 
ODFInternalFactory().create(TransactionContextExecutor.class);
+               try {
+                       return (DataSetCheckResult) 
transactionContextExecutor.runInTransactionContext(new Callable<Object>() {
+
+                               @Override
+                               public Object call() throws Exception {
+                                       return 
wrappedService.checkDataSet(dataSetContainer);
+                               }
+                       });
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionContextExecutor.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionContextExecutor.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionContextExecutor.java
new file mode 100755
index 0000000..6c17686
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionContextExecutor.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Use this interface in the core framework whenever you want to run code that 
is run from an unmanaged thread (typically in the Kafka consumers)
+ * and that accesses the metadata repository. The implementation of this class 
will ensure that the code will be run in the
+ * correct context (regarding transactions etc.)
+ * 
+ *
+ */
+public interface TransactionContextExecutor {
+       
+       /**
+        * Run a generic callable in a transaction context. This is not a 
template function as some of the underlying infrastructures
+        * might not be able to support it.
+        */
+       Object runInTransactionContext(Callable<Object> callable) throws 
Exception;
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionSyncDiscoveryServiceProxy.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionSyncDiscoveryServiceProxy.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionSyncDiscoveryServiceProxy.java
new file mode 100755
index 0000000..ec96e96
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/TransactionSyncDiscoveryServiceProxy.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import 
org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+
+public class TransactionSyncDiscoveryServiceProxy implements 
SyncDiscoveryService {
+
+       private SyncDiscoveryService wrappedService;
+
+       public TransactionSyncDiscoveryServiceProxy(SyncDiscoveryService 
wrappedService) {
+               this.wrappedService = wrappedService;
+       }
+
+       public DiscoveryServiceSyncResponse runAnalysis(final 
DiscoveryServiceRequest request) {
+               TransactionContextExecutor transactionContextExecutor = new 
ODFInternalFactory().create(TransactionContextExecutor.class);
+               try {
+                       return (DiscoveryServiceSyncResponse) 
transactionContextExecutor.runInTransactionContext(new Callable<Object>() {
+
+                               @Override
+                               public Object call() throws Exception {
+                                       return 
wrappedService.runAnalysis(request);
+                               }
+                       });
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+       }
+
+       public void setExecutorService(ExecutorService executorService) {
+               wrappedService.setExecutorService(executorService);
+       }
+
+       public void setMetadataStore(MetadataStore metadataStore) {
+               wrappedService.setMetadataStore(metadataStore);
+       }
+
+       public void setAnnotationStore(AnnotationStore annotationStore) {
+               wrappedService.setAnnotationStore(annotationStore);
+       }
+
+       public DataSetCheckResult checkDataSet(final DataSetContainer 
dataSetContainer) {
+               TransactionContextExecutor transactionContextExecutor = new 
ODFInternalFactory().create(TransactionContextExecutor.class);
+               try {
+                       return (DataSetCheckResult) 
transactionContextExecutor.runInTransactionContext(new Callable<Object>() {
+
+                               @Override
+                               public Object call() throws Exception {
+                                       return 
wrappedService.checkDataSet(dataSetContainer);
+                               }
+                       });
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceManagerImpl.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceManagerImpl.java
new file mode 100755
index 0000000..e7cbc44
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceManagerImpl.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.discoveryservice;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import 
org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRuntimeStatistics;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceStatus;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+import org.apache.atlas.odf.api.discoveryservice.ServiceStatusCount;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.core.configuration.ConfigManager;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+
+/**
+ *
+ * External Java API for creating and managing discovery services
+ *
+ */
+public class DiscoveryServiceManagerImpl implements DiscoveryServiceManager {
+       private Logger logger = 
Logger.getLogger(DiscoveryServiceManagerImpl.class.getName());
+       public ConfigManager configManager;
+
+       public DiscoveryServiceManagerImpl() {
+               configManager = new 
ODFInternalFactory().create(ConfigManager.class);
+       }
+
+       /**
+        * Retrieve list of discovery services registered in ODF
+        * @return List of registered ODF discovery services
+        */
+       public List<DiscoveryServiceProperties> 
getDiscoveryServicesProperties() {
+               logger.entering(DiscoveryServiceManager.class.getName(), 
"getDiscoveryServicesProperties");
+               List<DiscoveryServiceProperties> dsProperties = 
configManager.getConfigContainer().getRegisteredServices();
+               return dsProperties;
+       };
+
+       /**
+        * Register a new service in ODF
+        * @param dsProperties Properties of the discovery service to register
+        * @throws ValidationException Validation of a property failed
+        */
+       public void createDiscoveryService(DiscoveryServiceProperties 
dsProperties) throws ValidationException {
+               logger.entering(DiscoveryServiceManager.class.getName(), 
"createDiscoveryService");
+               ConfigContainer update = new ConfigContainer();
+               List<DiscoveryServiceProperties> registeredServices = 
configManager.getConfigContainer().getRegisteredServices();
+               
registeredServices.addAll(Collections.singletonList(dsProperties));
+               update.setRegisteredServices(registeredServices);
+               configManager.updateConfigContainer(update);
+
+
+       };
+
+       /**
+        * Update configuration of an ODF discovery service
+        * @param dsProperties Properties of the discovery service to update
+        */
+       public void replaceDiscoveryService(DiscoveryServiceProperties 
dsProperties) throws ServiceNotFoundException, ValidationException {
+               logger.entering(DiscoveryServiceManager.class.getName(), 
"updateDiscoveryService");
+               String serviceId = dsProperties.getId();
+               deleteDiscoveryService(serviceId);
+               createDiscoveryService(dsProperties);
+       };
+
+       /**
+        * Remove a registered service from ODF
+        * @param serviceId Discovery service ID
+        */
+       public void deleteDiscoveryService(String serviceId) throws 
ServiceNotFoundException, ValidationException {
+               logger.entering(DiscoveryServiceManager.class.getName(), 
"deleteDiscoveryService");
+               ConfigContainer cc = configManager.getConfigContainer();
+               Iterator<DiscoveryServiceProperties> iterator = 
cc.getRegisteredServices().iterator();
+               boolean serviceFound = false;
+               while (iterator.hasNext()) {
+                       if (iterator.next().getId().equals(serviceId)) {
+                               iterator.remove();
+                               serviceFound = true;
+                       }
+               }
+               if (!serviceFound) {
+                       throw new ServiceNotFoundException(serviceId);
+               } else {
+                       configManager.updateConfigContainer(cc);
+               }
+       };
+
+       /**
+        * Retrieve current configuration of a discovery services registered in 
ODF
+        * @param serviceId Discovery Service ID
+        * @return Properties of the service with this ID
+        * @throws ServiceNotFoundException A service with this ID is not 
registered
+        */
+       public DiscoveryServiceProperties getDiscoveryServiceProperties(String 
serviceId) throws ServiceNotFoundException {
+               logger.entering(DiscoveryServiceManager.class.getName(), 
"getDiscoveryServiceProperties");
+               DiscoveryServiceProperties serviceFound = null;
+               List<DiscoveryServiceProperties> registeredServices;
+               registeredServices = 
configManager.getConfigContainer().getRegisteredServices();
+               for (DiscoveryServiceProperties service : registeredServices) {
+                       if (service.getId().equals(serviceId)) {
+                               serviceFound = service;
+                               break;
+                       }
+               }
+               if (serviceFound == null) {
+                       throw new ServiceNotFoundException(serviceId);
+               }
+               return serviceFound;
+       };
+
+       /**
+        * Retrieve status overview of all discovery services registered in ODF
+        * @return List of status count maps for all discovery services
+        */
+       public List<ServiceStatusCount> getDiscoveryServiceStatusOverview() {
+               DiscoveryServiceStatistics stats = new 
DiscoveryServiceStatistics(new 
ODFInternalFactory().create(AnalysisRequestTrackerStore.class).getRecentTrackers(0,-1));
+               return stats.getStatusCountPerService();
+       }
+
+       /**
+        * Retrieve status of a specific discovery service. Returns null if no 
service info can be obtained
+        * @param serviceId Discovery Service ID
+        * @return Status of the service with this ID
+        */
+       public DiscoveryServiceStatus getDiscoveryServiceStatus(String 
serviceId) throws ServiceNotFoundException {
+               logger.entering(DiscoveryServiceManager.class.getName(), 
"getDiscoveryServiceStatus");
+
+               DiscoveryServiceStatus dsStatus = null;
+               ControlCenter cc = new 
ODFInternalFactory().create(ControlCenter.class);
+               DiscoveryService ds = cc.getDiscoveryServiceProxy(serviceId, 
null);
+               if (ds == null) {
+                       throw new ServiceNotFoundException(serviceId);
+               }
+               dsStatus = new DiscoveryServiceStatus();
+               dsStatus.setStatus(DiscoveryServiceStatus.Status.OK);
+               dsStatus.setMessage(MessageFormat.format("Discovery service 
''{0}'' status is OK", serviceId));
+               ServiceStatusCount serviceStatus = null;
+               List<ServiceStatusCount> statusCounts = 
getDiscoveryServiceStatusOverview();
+               for (ServiceStatusCount cnt : statusCounts) {
+                       if (cnt.getId().equals(serviceId)) {
+                               serviceStatus = cnt;
+                               break;
+                       }
+               }
+               if (serviceStatus != null) {
+                       dsStatus.setStatusCount(serviceStatus);
+               }
+               return dsStatus;
+       };
+
+       /**
+        * Retrieve runtime statistics of a specific discovery service
+        * @param serviceId Discovery Service ID
+        * @return Runtime statistics of the service with this ID
+        */
+       public DiscoveryServiceRuntimeStatistics 
getDiscoveryServiceRuntimeStatistics(String serviceId) throws 
ServiceNotFoundException {
+               logger.entering(DiscoveryServiceManager.class.getName(), 
"getDiscoveryServiceRuntimeStatistics");
+               DiscoveryServiceRuntimeStatistics dsrs = new 
DiscoveryServiceRuntimeStatistics();
+               dsrs.setAverageProcessingTimePerItemInMillis(0);   // TODO: 
implement
+               return dsrs;
+       };
+
+       /**
+        * Delete runtime statistics of a specific discovery service
+        * @param serviceId Discovery Service ID
+        */
+       public void deleteDiscoveryServiceRuntimeStatistics(String serviceId) 
throws ServiceNotFoundException {
+               logger.entering(DiscoveryServiceManager.class.getName(), 
"deleteDiscoveryServiceRuntimeStatistics");
+               // TODO: implement
+       };
+
+       /**
+        * Retrieve picture representing a discovery service
+        * @param serviceId Discovery Service ID
+        * @return Input stream for image
+        */
+       public InputStream getDiscoveryServiceImage(String serviceId) throws 
ServiceNotFoundException {
+               logger.entering(DiscoveryServiceManager.class.getName(), 
"getDiscoveryServiceImage");
+               final String defaultImageDir = "org/apache/atlas/odf/images";
+
+               String imgUrl = null;
+               for (DiscoveryServiceProperties info : 
configManager.getConfigContainer().getRegisteredServices()) {
+                       if (info.getId().equals(serviceId)) {
+                               imgUrl = info.getIconUrl();
+                               break;
+                       }
+               }
+
+               ClassLoader cl = this.getClass().getClassLoader();
+               InputStream is = null;
+               if (imgUrl != null) {
+                       is = cl.getResourceAsStream("META-INF/odf/" + imgUrl);
+                       if (is == null) {
+                               is = cl.getResourceAsStream(defaultImageDir + 
"/" + imgUrl);
+                               if (is == null) {
+                                       try {
+                                               is = new 
URL(imgUrl).openStream();
+                                       } catch (MalformedURLException e) {
+                                               logger.log(Level.WARNING, "The 
specified image url {0} for service {1} is invalid!", new String[] { imgUrl, 
serviceId });
+                                       } catch (IOException e) {
+                                               logger.log(Level.WARNING, "The 
specified image url {0} for service {1} could not be accessed!", new String[] { 
imgUrl, serviceId });
+                                       }
+                               }
+                       }
+               }
+               if (imgUrl == null || is == null) {
+                       //TODO is this correct? maybe we should use a single 
default image instead of a random one
+                       try {
+                               is = cl.getResourceAsStream(defaultImageDir);
+                               if (is != null) {
+                                       InputStreamReader r = new 
InputStreamReader(is);
+                                       BufferedReader br = new 
BufferedReader(r);
+                                       List<String> images = new ArrayList<>();
+                                       String line = null;
+                                       while ((line = br.readLine()) != null) {
+                                               images.add(line);
+                                       }
+                                       // return random image
+                                       int ix = Math.abs(serviceId.hashCode()) 
% images.size();
+                                       is = 
cl.getResourceAsStream(defaultImageDir + "/" + images.get(ix));
+                               }
+                       } catch (IOException exc) {
+                               logger.log(Level.WARNING, "Exception occurred 
while retrieving random image, ignoring it", exc);
+                               is = null;
+                       }
+               }
+               return is;
+       };
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceStatistics.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceStatistics.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceStatistics.java
new file mode 100755
index 0000000..6be0e5a
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/discoveryservice/DiscoveryServiceStatistics.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.discoveryservice;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.ServiceStatusCount;
+
+public class DiscoveryServiceStatistics {
+
+       private List<AnalysisRequestTracker> requests = new 
ArrayList<AnalysisRequestTracker>();
+
+       public DiscoveryServiceStatistics(List<AnalysisRequestTracker> 
requests) {
+               this.requests = requests;
+       }
+
+       public List<ServiceStatusCount> getStatusCountPerService() {
+               List<ServiceStatusCount> result = new 
ArrayList<ServiceStatusCount>();
+
+               Map<String, LinkedHashMap<STATUS, Integer>> statusMap = new 
HashMap<String, LinkedHashMap<STATUS, Integer>>();
+
+               for (AnalysisRequestTracker tracker : requests) {
+                       int maxDiscoveryServiceRequest = 
(tracker.getNextDiscoveryServiceRequest() == 0 ? 1 : 
tracker.getNextDiscoveryServiceRequest());
+                       for (int no = 0; no < maxDiscoveryServiceRequest; no++) 
{
+                               STATUS cntStatus = tracker.getStatus();
+
+                               //No parallel requests are possible atm -> all 
requests leading to current one must be finished
+                               if (no < maxDiscoveryServiceRequest - 1) {
+                                       cntStatus = STATUS.FINISHED;
+                               }
+
+                               DiscoveryServiceRequest req = 
tracker.getDiscoveryServiceRequests().get(no);
+                               LinkedHashMap<STATUS, Integer> cntMap = 
statusMap.get(req.getDiscoveryServiceId());
+                               if (cntMap == null) {
+                                       cntMap = new LinkedHashMap<STATUS, 
Integer>();
+                                       //add 0 default values
+                                       for (STATUS status : STATUS.values()) {
+                                               cntMap.put(status, 0);
+                                       }
+                               }
+                               Integer val = cntMap.get(cntStatus);
+                               val++;
+                               cntMap.put(cntStatus, val);
+                               statusMap.put(req.getDiscoveryServiceId(), 
cntMap);
+                       }
+               }
+
+               for (String key : statusMap.keySet()) {
+                       ServiceStatusCount cnt = new ServiceStatusCount();
+                       cnt.setId(key);
+                       for (DiscoveryServiceProperties info : new 
ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties())
 {
+                               if (info.getId().equals(key)) {
+                                       cnt.setName(info.getName());
+                                       break;
+                               }
+                       }
+                       cnt.setStatusCountMap(statusMap.get(key));
+                       result.add(cnt);
+               }
+
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/engine/EngineManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/engine/EngineManagerImpl.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/engine/EngineManagerImpl.java
new file mode 100755
index 0000000..d09297a
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/engine/EngineManagerImpl.java
@@ -0,0 +1,221 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.engine;
+
+import java.io.InputStream;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.engine.EngineManager;
+import org.apache.atlas.odf.api.engine.MessagingStatus;
+import org.apache.atlas.odf.api.engine.ODFEngineOptions;
+import org.apache.atlas.odf.api.engine.ODFStatus;
+import org.apache.atlas.odf.api.engine.ODFVersion;
+import org.apache.atlas.odf.api.engine.ServiceRuntimesInfo;
+import org.apache.atlas.odf.api.engine.SystemHealth;
+import org.apache.atlas.odf.api.engine.ThreadStatus;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.core.ODFInitializer;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.ODFUtils;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage.Type;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+
+/**
+*
+* External Java API for managing and controlling the ODF engine
+*
+*/
+public class EngineManagerImpl implements EngineManager {
+
+       private Logger logger = 
Logger.getLogger(EngineManagerImpl.class.getName());
+
+       public EngineManagerImpl() {
+       }
+
+       /**
+        * Checks the health status of ODF
+        *
+        * @return Health status of the ODF engine
+        */
+       public SystemHealth checkHealthStatus() {
+               SystemHealth health = new SystemHealth();
+               try {
+                       AnalysisRequest dummyRequest = new AnalysisRequest();
+                       String dataSetID = 
ControlCenter.HEALTH_TEST_DATA_SET_ID_PREFIX + UUID.randomUUID().toString();
+                       MetaDataObjectReference dataSetRef = new 
MetaDataObjectReference();
+                       dataSetRef.setId(dataSetID);
+                       
dummyRequest.setDataSets(Collections.singletonList(dataSetRef));
+                       List<String> discoveryServiceSequence = new 
ArrayList<String>();
+                       
discoveryServiceSequence.add(ControlCenter.HEALTH_TEST_DISCOVERY_SERVICE_ID);
+                       
dummyRequest.setDiscoveryServiceSequence(discoveryServiceSequence);
+
+                       AnalysisManager analysisManager = new 
ODFFactory().create().getAnalysisManager();
+                       AnalysisResponse resp = 
analysisManager.runAnalysis(dummyRequest);
+                       String reqId = resp.getId();
+                       AnalysisRequestStatus status = null;
+                       final int maxNumberOfTimesToPoll = 500;
+                       int count = 0;
+                       int msToSleepBetweenPolls = 20;
+                       boolean continuePolling = false;
+                       do {
+                               status = 
analysisManager.getAnalysisRequestStatus(reqId);
+                               continuePolling = (status.getState() == 
AnalysisRequestStatus.State.QUEUED || status.getState() == 
AnalysisRequestStatus.State.ACTIVE || status.getState() == 
AnalysisRequestStatus.State.NOT_FOUND) && count < maxNumberOfTimesToPoll;
+                               if (continuePolling) {
+                                       count++;
+                                       Thread.sleep(msToSleepBetweenPolls);
+                               }
+                       } while (continuePolling);
+                       logger.log(Level.INFO, "Health check request ''{3}'' 
has status ''{0}'', time spent: {2}ms details ''{1}''", new Object[] { 
status.getState(), status.getDetails(),
+                                       count * msToSleepBetweenPolls, reqId });
+                       health.getMessages().add(MessageFormat.format("Details 
message: {0}", status.getDetails()));
+                       if (count >= maxNumberOfTimesToPoll) {
+                               health.setStatus( 
SystemHealth.HealthStatus.WARNING);
+                               String msg = MessageFormat.format("Health test 
request could not be processed in time ({0}ms)", (maxNumberOfTimesToPoll * 
msToSleepBetweenPolls));
+                               logger.log(Level.INFO, msg);
+                               health.getMessages().add(msg);
+                       } else {
+                               switch (status.getState()) {
+                               case NOT_FOUND:
+                                       
health.setStatus(SystemHealth.HealthStatus.ERROR);
+                                       
health.getMessages().add(MessageFormat.format("Request ID ''{0}'' got lost", 
reqId));
+                                       break;
+                               case ERROR:
+                                       
health.setStatus(SystemHealth.HealthStatus.ERROR);
+                                       break;
+                               case FINISHED:
+                                       
health.setStatus(SystemHealth.HealthStatus.OK);
+                                       break;
+                               default:
+                                       
health.setStatus(SystemHealth.HealthStatus.ERROR);
+                               }
+                       }
+               } catch (Exception exc) {
+                       logger.log(Level.WARNING, "An unknown error occurred", 
exc);
+                       health.setStatus(SystemHealth.HealthStatus.ERROR);
+                       
health.getMessages().add(Utils.getExceptionAsString(exc));
+               }
+               return health;
+       }
+
+       /**
+        * Returns the status of the ODF thread manager
+        *
+        * @return Status of all threads making up the ODF thread manager
+        */
+       public List<ThreadStatus> getThreadManagerStatus() {
+               ThreadManager tm = new 
ODFInternalFactory().create(ThreadManager.class);
+               return tm.getThreadManagerStatus();
+       }
+
+       /**
+        * Returns the status of the ODF messaging subsystem
+        *
+        * @return Status of the ODF messaging subsystem
+        */
+       public MessagingStatus getMessagingStatus() {
+               return new 
ODFInternalFactory().create(DiscoveryServiceQueueManager.class).getMessagingStatus();
+       }
+
+       /**
+        * Returns the status of the messaging subsystem and the internal 
thread manager
+        *
+        * @return Combined status of the messaging subsystem and the internal 
thread manager
+        */
+       public ODFStatus getStatus() {
+               ODFStatus status = new ODFStatus();
+               status.setMessagingStatus(this.getMessagingStatus());
+               status.setThreadManagerStatus(this.getThreadManagerStatus());
+               return status;
+       }
+
+       /**
+        * Returns the current ODF version
+        *
+        * @return ODF version identifier
+        */
+       public ODFVersion getVersion() {
+               InputStream is = 
ODFUtils.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/odfversion.txt");
+               ODFVersion version = new ODFVersion();
+               if (is == null) {
+                       version.setVersion("NOTFOUND");
+               } else {
+                       version.setVersion(Utils.getInputStreamAsString(is, 
"UTF-8").trim());
+               }
+               return version;
+       }
+
+       /**
+        * Shuts down the ODF engine, purges all scheduled analysis requests 
from the queues, and cancels all running analysis requests.
+        * This means that all running jobs will be cancelled or their results 
will not be reported back.
+        * (for debugging purposes only)
+        *
+        * @param options Option for immediately restarting the engine after 
shutdown (default is not to restart immediately but only when needed)
+        */
+       public void shutdown(ODFEngineOptions options) {
+               long currentTime = System.currentTimeMillis();
+
+               ControlCenter controlCenter = new 
ODFInternalFactory().create(ControlCenter.class);
+               AdminMessage shutDownMessage = new AdminMessage();
+               Type t = Type.SHUTDOWN;
+               if (options.isRestart()) {
+                       t = Type.RESTART;
+               }
+               shutDownMessage.setAdminMessageType(t);
+               String detailMsg = MessageFormat.format("Shutdown was requested 
on {0} via ODF API", new Object[] { new Date() });
+               shutDownMessage.setDetails(detailMsg);
+               logger.log(Level.INFO, detailMsg);
+               
controlCenter.getQueueManager().enqueueInAdminQueue(shutDownMessage);
+               int maxPolls = 60;
+               int counter = 0;
+               int timeBetweenPollsMs = 1000;
+               while (counter < maxPolls && 
ODFInitializer.getLastStopTimestamp() <= currentTime) {
+                       try {
+                               Thread.sleep(timeBetweenPollsMs);
+                       } catch (InterruptedException e) {
+                               e.printStackTrace();
+                       }
+                       counter++;
+               }
+               long timeWaited = ((counter * timeBetweenPollsMs) / 1000);
+               logger.log(Level.INFO, "Waited for {0} seconds for shutdown", 
timeWaited);
+               if (counter >= maxPolls) {
+                       logger.log(Level.WARNING, "Waited for shutdown too 
long. Continuing." );
+               } else {
+                       logger.log(Level.INFO, "Shutdown issued successfully");
+               }
+       }
+
+       @Override
+       public ServiceRuntimesInfo getRuntimesInfo() {
+               return 
ServiceRuntimes.getRuntimesInfo(ServiceRuntimes.getAllRuntimes());
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DefaultMessageEncryption.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DefaultMessageEncryption.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DefaultMessageEncryption.java
new file mode 100755
index 0000000..9177556
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DefaultMessageEncryption.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging;
+
+/**
+ * Default encryption: no encryption
+ * 
+ */
+public class DefaultMessageEncryption implements MessageEncryption {
+       
+       @Override
+       public String encrypt(String message) {
+               return message;
+       }
+
+       @Override
+       public String decrypt(String message) {
+               return message;
+       }
+
+
+       /*
+       // this used to be our default encryption. Leaving it in here for 
reference.
+       @Override
+       public String encrypt(String message) {
+               try {
+                       return 
DatatypeConverter.printBase64Binary(message.getBytes("UTF-8"));
+               } catch (UnsupportedEncodingException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       @Override
+       public String decrypt(String message)  {
+               try {
+                       return new 
String(DatatypeConverter.parseBase64Binary(message), "UTF-8");
+               } catch (UnsupportedEncodingException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+       */
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DiscoveryServiceQueueManager.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DiscoveryServiceQueueManager.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DiscoveryServiceQueueManager.java
new file mode 100755
index 0000000..d2d84dd
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/DiscoveryServiceQueueManager.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging;
+
+import java.util.concurrent.TimeoutException;
+
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.engine.MessagingStatus;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage;
+import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry;
+
+
+
+public interface DiscoveryServiceQueueManager {
+       
+       void start() throws TimeoutException;
+       
+       void stop() throws TimeoutException;
+               
+       // find the next queue where this tracker should go and put it there
+       void enqueue(AnalysisRequestTracker tracker);
+       
+       void enqueueInStatusQueue(StatusQueueEntry sqe);
+       
+       void enqueueInAdminQueue(AdminMessage message);
+       
+       MessagingStatus getMessagingStatus();
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/MessageEncryption.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/MessageEncryption.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/MessageEncryption.java
new file mode 100755
index 0000000..ad1bf28
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/messaging/MessageEncryption.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging;
+
+public interface MessageEncryption {
+       String encrypt(String message);
+
+       String decrypt(String message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/DefaultMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/DefaultMetadataStore.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/DefaultMetadataStore.java
new file mode 100755
index 0000000..c71ba3c
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/metadata/DefaultMetadataStore.java
@@ -0,0 +1,381 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.metadata;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.AnnotationPropagator;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.StoredMetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.RelationshipAnnotation;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.metadata.DefaultMetadataQueryBuilder;
+import org.apache.atlas.odf.api.metadata.InternalMetaDataUtils;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStoreException;
+import org.apache.atlas.odf.api.metadata.models.ClassificationAnnotation;
+import org.apache.atlas.odf.api.metadata.models.ConnectionInfo;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.json.JSONUtils;
+
+/**
+ * In-memory implementation of MetadataStore interface to be used for testing 
as
+ * well as for single-node ODF deployments. Uses static HashMaps for storing 
the
+ * metadata types and objects.
+ * 
+ * 
+ */
+public class DefaultMetadataStore extends WritableMetadataStoreBase implements 
WritableMetadataStore {
+       private Logger logger = 
Logger.getLogger(DefaultMetadataStore.class.getName());
+
+       private static final String METADATA_STORE_ID = 
"ODF_LOCAL_METADATA_STORE";
+       private static final String STORE_PROPERTY_TYPE = "default";
+       private static final String STORE_PROPERTY_DESCRIPTION = "ODF local 
metadata store";
+
+       private static HashMap<String, String> typeStore;
+       private static HashMap<String, StoredMetaDataObject> objectStore;
+       protected LinkedHashMap<String, StoredMetaDataObject> stagedObjects = 
new LinkedHashMap<String, StoredMetaDataObject>();
+       private static boolean isInitialized = false;
+       protected static Object accessLock = new Object();
+       static Object initializationLock = new Object();
+
+       public DefaultMetadataStore() {
+               synchronized (initializationLock) {
+                       if (!isInitialized) {
+                               isInitialized = true;
+                               this.resetAllData();
+                       }
+               }
+       }
+
+       protected WritableMetadataStore getMetadataStore() {
+               return this;
+       }
+
+       protected Object getAccessLock() {
+               return accessLock;
+       }
+
+       protected HashMap<String, StoredMetaDataObject> getObjects() {
+               return objectStore;
+       }
+
+       protected LinkedHashMap<String, StoredMetaDataObject> 
getStagedObjects() {
+               return stagedObjects;
+       }
+
+       @Override
+    public ConnectionInfo getConnectionInfo(MetaDataObject informationAsset) {
+       synchronized(accessLock) {
+               return WritableMetadataStoreUtils.getConnectionInfo(this, 
informationAsset);
+       }
+    };
+
+       @Override
+       public void resetAllData() {
+               logger.log(Level.INFO, "Resetting all data in metadata store.");
+               synchronized (accessLock) {
+                       typeStore = new HashMap<String, String>();
+                       objectStore = new HashMap<String, 
StoredMetaDataObject>();
+                       createTypes(WritableMetadataStoreUtils.getBaseTypes());
+               }
+       }
+
+       @Override
+       public Properties getProperties() {
+               Properties props = new Properties();
+               props.put(MetadataStore.STORE_PROPERTY_DESCRIPTION, 
STORE_PROPERTY_DESCRIPTION);
+               props.put(MetadataStore.STORE_PROPERTY_TYPE, 
STORE_PROPERTY_TYPE);
+               props.put(STORE_PROPERTY_ID, METADATA_STORE_ID);
+               return props;
+       }
+
+       @Override
+       public String getRepositoryId() {
+               return METADATA_STORE_ID;
+       }
+
+       @Override
+       public List<MetaDataObjectReference> search(String query) {
+               if ((query == null) || query.isEmpty()) {
+                       throw new MetadataStoreException("The search term 
cannot be null or empty.");
+               }
+               logger.log(Level.INFO, MessageFormat.format("Processing query 
\"{0}\".", query));
+               synchronized (accessLock) {
+                       LinkedList<String> queryElements = new 
LinkedList<String>();
+                       for (String el : 
query.split(DefaultMetadataQueryBuilder.SEPARATOR_STRING)) {
+                               queryElements.add(el);
+                       }
+                       List<MetaDataObjectReference> result = new 
ArrayList<MetaDataObjectReference>();
+                       String firstOperator = queryElements.removeFirst();
+
+                       if 
(firstOperator.equals(DefaultMetadataQueryBuilder.DATASET_IDENTIFIER)) {
+                               String requestedObjectType = 
queryElements.removeFirst();
+                               for (StoredMetaDataObject currentInternalObject 
: getObjects().values()) {
+                                       MetaDataObject currentObject = 
currentInternalObject.getMetaDataObject();
+                                       String currentObjectType = 
getObjectType(currentObject);
+                                       try {
+                                               if 
(isSubTypeOf(requestedObjectType, currentObjectType)
+                                                               && 
isConditionMet(currentObject, queryElements)) {
+                                                       
result.add(currentObject.getReference());
+                                               }
+                                       } catch (IllegalArgumentException | 
IllegalAccessException e) {
+                                               throw new 
MetadataStoreException(
+                                                               
MessageFormat.format("Error processing \"{0}\" clause of query.",
+                                                                               
DefaultMetadataQueryBuilder.DATASET_IDENTIFIER));
+                                       }
+                               }
+                               return result;
+                       } else {
+                               throw new 
MetadataStoreException(MessageFormat.format("Query ''{0}'' is not valid.", 
query));
+                       }
+               }
+       }
+
+       @Override
+       public void createSampleData() {
+               logger.log(Level.INFO, "Creating sample data in metadata 
store.");
+               SampleDataHelper.copySampleFiles();
+               WritableMetadataStoreUtils.createSampleDataObjects(this);
+       }
+
+       @Override
+       public AnnotationPropagator getAnnotationPropagator() {
+               return new AnnotationPropagator() {
+
+                       @Override
+                       public void propagateAnnotations(AnnotationStore as, 
String requestId) {
+                               List<Annotation> annotations = 
as.getAnnotations(null, requestId);
+                               for (Annotation annot : annotations) {
+                                       ensureAnnotationTypeExists(annot);
+                                       annot.setReference(null); // Set 
reference to null because a new reference will be generated by the metadata 
store
+                                       getMetadataStore().createObject(annot);
+                                       commit();
+                               }
+                       }
+               };
+       }
+
+       /**
+        * Internal helper that creates a list of types in the metadata store.
+        *
+        * @param typeList List of types to be created
+        */
+       private void createTypes(List<Class<?>> typeList) {
+               synchronized (accessLock) {
+                       for (Class<?> type : typeList) {
+                               if 
(!typeStore.containsKey(type.getSimpleName())) {
+                                       logger.log(Level.INFO,
+                                                       
MessageFormat.format("Creating new type \"{0}\" in metadata store.", 
type.getSimpleName()));
+                                       typeStore.put(type.getSimpleName(), 
type.getSuperclass().getSimpleName());
+                               } else {
+                                       throw new 
MetadataStoreException(MessageFormat.format(
+                                                       "A type with the name 
\"{0}\" already exists in this metadata store.", type.getName()));
+                               }
+                       }
+               }
+       };
+
+       /**
+        * Internal helper that returns the type name of a given metadata 
object.
+        *
+        * @param mdo Metadata object
+        * @return Type name 
+        */
+       protected String getObjectType(MetaDataObject mdo) {
+               if (mdo instanceof Annotation) {
+                       // Important when using the MetadataStore as an 
AnnotationStore
+                       return ((Annotation) mdo).getAnnotationType();
+               } else {
+                       return mdo.getClass().getSimpleName();
+               }
+       }
+
+       /**
+        * Internal helper that checks if a type is a sub type of another type 
+        *
+        * @param subTypeName Name of the type that is supposed to be the sub 
type
+        * @param parentTypeName Name of the type that is supposed to be the 
parent type
+        */
+       private boolean isSubTypeOf(String subTypeName, String parentTypeName) {
+               if (subTypeName.equals(parentTypeName)) {
+                       return true;
+               }
+               if (typeStore.get(parentTypeName) != null) {
+                       String parent = typeStore.get(parentTypeName);
+                       if ((parent != null) && 
(!parent.equals(parentTypeName))) {
+                               if (isSubTypeOf(subTypeName, parent)) {
+                                       return true;
+                               }
+                       }
+               }
+               return false;
+       }
+
+       /**
+        * Internal helper that checks if the attributes of a given metadata 
object meet a given condition. 
+        *
+        * @param mdo Metadata object
+        * @param condition List of tokens that make up the condition phrase
+        */
+       private boolean isConditionMet(MetaDataObject mdo, LinkedList<String> 
condition)
+                       throws IllegalArgumentException, IllegalAccessException 
{
+               if (condition.isEmpty()) {
+                       return true;
+               }
+               LinkedList<String> clonedCondition = new LinkedList<String>();
+               clonedCondition.addAll(condition);
+               try {
+                       JSONObject mdoJson = JSONUtils.toJSONObject(mdo);
+                       logger.log(Level.FINER, 
MessageFormat.format("Evaluating object \"{0}\".", mdoJson));
+                       while (clonedCondition.size() >= 4) {
+                               // Each condition clause consists of four 
elements, e.g. "where
+                               // name = 'BankClientsShort'" or "and name = 
'BankClientsShort'"
+                               String operator = clonedCondition.removeFirst();
+                               String attribute = 
clonedCondition.removeFirst();
+                               String comparator = 
clonedCondition.removeFirst();
+                               String expectedValueWithQuotes = 
clonedCondition.removeFirst();
+                               while 
((!expectedValueWithQuotes.endsWith(DefaultMetadataQueryBuilder.QUOTE_IDENTIFIER))
 && (clonedCondition.size() != 0)) {
+                                       expectedValueWithQuotes = 
expectedValueWithQuotes + DefaultMetadataQueryBuilder.SEPARATOR_STRING + 
clonedCondition.removeFirst();
+                               }
+                               if 
(operator.equals(DefaultMetadataQueryBuilder.CONDITION_PREFIX)
+                                               || 
operator.equals(DefaultMetadataQueryBuilder.AND_IDENTIFIER)) {
+                                       if (mdoJson.containsKey(attribute)) {
+                                               String actualValue = (String) 
mdoJson.get(attribute) != null ? mdoJson.get(attribute).toString() : null;
+                                               if 
(comparator.equals(DefaultMetadataQueryBuilder.EQUALS_IDENTIFIER)) {
+                                                       if 
(!expectedValueWithQuotes.equals(DefaultMetadataQueryBuilder.QUOTE_IDENTIFIER + 
actualValue + DefaultMetadataQueryBuilder.QUOTE_IDENTIFIER)) {
+                                                               // Condition is 
not met
+                                                               return false;
+                                                       }
+                                               } else if 
(comparator.equals(DefaultMetadataQueryBuilder.NOT_EQUALS_IDENTIFIER)) {
+                                                       if 
(expectedValueWithQuotes.equals(DefaultMetadataQueryBuilder.QUOTE_IDENTIFIER + 
actualValue + DefaultMetadataQueryBuilder.QUOTE_IDENTIFIER)) {
+                                                               // Condition is 
not met
+                                                               return false;
+                                                       }
+                                               } else {
+                                                       throw new 
MetadataStoreException(
+                                                                       
MessageFormat.format("Unknown comparator \"{0}\" in query condition \"{1}\".",
+                                                                               
        new Object[] { comparator, condition.toString() }));
+                                               }
+                                       } else {
+                                               logger.log(Level.INFO,
+                                                               
MessageFormat.format("The object does not contain attribute \"{0}\".", 
attribute));
+                                               // Condition is not met
+                                               return false;
+                                       }
+                               } else {
+                                       throw new MetadataStoreException(
+                                                       
MessageFormat.format("Syntax error in query condition \"{0}\".", 
condition.toString()));
+                               }
+                       }
+                       if (clonedCondition.size() != 0) {
+                               throw new MetadataStoreException(
+                                               MessageFormat.format("Error 
parsing trailing query elements \"{0}\".", clonedCondition));
+                       }
+                       // All conditions are met
+                       return true;
+               } catch (JSONException e) {
+                       throw new 
MetadataStoreException(MessageFormat.format("Error parsing JSON object {0} in 
query.", mdo), e);
+               }
+       }
+
+       /**
+        * Internal helper that merges the references of a staged metadata 
object with the references of the current metadata object
+        * stored in the metadata store. The missing references are added to 
the provided object in place.
+        *
+        * @param object Internal representation of a staged metadata object
+        */
+       private void mergeReferenceMap(StoredMetaDataObject object) {
+               HashMap<String, List<MetaDataObjectReference>> 
mergedObjectRefMap = new HashMap<String, List<MetaDataObjectReference>>();
+               String objectId = 
object.getMetaDataObject().getReference().getId();
+               if (getObjects().get(objectId) != null) {
+                       // Only merge if the object already exists in the 
metadata store
+                       HashMap<String, List<MetaDataObjectReference>> 
originalRefMap = getObjects().get(objectId)
+                                       .getReferenceMap(); // Get reference 
map of exiting object
+                       HashMap<String, List<MetaDataObjectReference>> 
updatedObjectRefMap = object.getReferenceMap();
+                       for (String referenceId : updatedObjectRefMap.keySet()) 
{
+                               // Update original reference map in place
+                               mergedObjectRefMap.put(referenceId,
+                                               
InternalMetaDataUtils.mergeReferenceLists(originalRefMap.get(referenceId), 
updatedObjectRefMap.get(referenceId)));
+                       }
+                       object.setReferencesMap(mergedObjectRefMap);
+               }
+       }
+
+       @Override
+       public void commit() {
+               synchronized (accessLock) {
+                       // Check if all required types exist BEFORE starting to 
create the
+                       // objects in order to avoid partial creation of objects
+                       for (Map.Entry<String, StoredMetaDataObject> mapEntry : 
this.stagedObjects.entrySet()) {
+                               String typeName = 
getObjectType(mapEntry.getValue().getMetaDataObject());
+                               if ((typeName == null) || 
!typeStore.containsKey(typeName)) {
+                                       throw new 
MetadataStoreException(MessageFormat.format(
+                                                       "The type \"{0}\" of 
the object you are trying to create does not exist in this metadata store.",
+                                                       typeName));
+                               }
+                       }
+
+                       // Move objects from staging area into metadata store
+                       for (Map.Entry<String, StoredMetaDataObject> mapEntry : 
this.stagedObjects.entrySet()) {
+                               StoredMetaDataObject object = 
mapEntry.getValue();
+                               String typeName = 
getObjectType(mapEntry.getValue().getMetaDataObject());
+                               logger.log(Level.INFO,
+                                               MessageFormat.format(
+                                                               "Creating or 
updating object with id ''{0}'' and type ''{1}'' in metadata store.",
+                                                               new Object[] { 
object.getMetaDataObject().getReference(), typeName }));
+                               String objectId = 
object.getMetaDataObject().getReference().getId();
+                               mergeReferenceMap(object); // Merge new object 
references with
+                                                                               
        // existing object references in
+                                                                               
        // metadata store
+                               getObjects().put(objectId, object);
+                       }
+
+                       // Clear staging area
+                       stagedObjects = new LinkedHashMap<String, 
StoredMetaDataObject>();
+               }
+       }
+
+       /**
+        * Internal helper that creates a new annotation type in the internal 
type store if it does not yet exist.
+        *
+        * @param mds Metadata store to operate on
+        */
+       private void ensureAnnotationTypeExists(Annotation annotation) {
+               String annotationType = annotation.getAnnotationType();
+               if (typeStore.get(annotationType) == null) {
+                       if (annotation instanceof ProfilingAnnotation) {
+                               typeStore.put(annotationType, 
"ProfilingAnnotation");
+                       } else if (annotation instanceof 
ClassificationAnnotation) {
+                               typeStore.put(annotationType, 
"ClassificationAnnotation");
+                       } else if (annotation instanceof 
RelationshipAnnotation) {
+                               typeStore.put(annotationType, 
"RelationshipAnnotation");
+                       }
+               }
+       }
+}

Reply via email to