http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java
new file mode 100755
index 0000000..4021049
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.text.MessageFormat;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ODFImplementations {
+
+       Logger logger = Logger.getLogger(ODFImplementations.class.getName());
+
+       private Map<String, String> implementations = new HashMap<String, 
String>();
+
+       public ODFImplementations(String path, ClassLoader cl) {
+               Enumeration<URL> resources;
+               try {
+                       resources = cl.getResources(path);
+               } catch (IOException exc) {
+                       logger.log(Level.WARNING, MessageFormat.format("An 
error occurred while reading properties ''0'' could not be loaded", path), exc);
+                       return;
+               }
+               while (resources.hasMoreElements()) {
+                       URL url = resources.nextElement();
+                       try {
+                               InputStream is = url.openStream();
+                               if (is != null) {
+                                       Properties props = new Properties();
+                                       props.load(is);
+                                       for (Object key : props.keySet()) {
+                                               String keyString = (String) key;
+                                               try {
+                                                       if 
(implementations.containsKey(key)) {
+                                                               String 
existingClassString = implementations.get(keyString);
+                                                               String 
newClassString = props.getProperty(keyString);
+                                                               if 
(!existingClassString.equals(newClassString)) {
+                                                                       
Class<?> existingClass = cl.loadClass(existingClassString);
+                                                                       
Class<?> newClass = cl.loadClass(newClassString);
+                                                                       String 
superClass = null;
+                                                                       String 
subClass = null;
+                                                                       // 
select the class lowest in the class hierarchy 
+                                                                       if 
(existingClass.isAssignableFrom(newClass)) {
+                                                                               
superClass = existingClassString;
+                                                                               
subClass = newClassString;
+                                                                       } else 
if (newClass.isAssignableFrom(existingClass)) {
+                                                                               
superClass = newClassString;
+                                                                               
subClass = existingClassString;
+                                                                       }
+                                                                       if 
(superClass != null) {
+                                                                               
logger.log(Level.INFO, "Implementation for interface ''{0}'' was found more 
than once, using subclass ''{1}'' (found superclass ''{2}'')",
+                                                                               
                new Object[] { key, subClass, superClass });
+                                                                               
implementations.put(keyString, subClass);
+                                                                       } else {
+                                                                               
logger.log(Level.WARNING, "Implementation for interface ''{0}'' was found more 
than once, using ''{1}''. (Conflict between ''{1}'' and ''{2}'')",
+                                                                               
                new Object[] { key, existingClassString, newClassString });
+                                                                       }
+                                                               }
+                                                       } else {
+                                                               
cl.loadClass(props.getProperty(keyString));
+                                                               
implementations.put(keyString, props.getProperty(keyString));
+                                                       }
+                                               } catch (ClassNotFoundException 
exc) {
+                                                       
logger.log(Level.SEVERE, "Class found in odf-implementation.properties file 
could not be loaded", exc);
+                                               }
+                                       }
+                               }
+                       } catch (IOException e) {
+                               logger.log(Level.WARNING, 
MessageFormat.format("Properties ''0'' could not be loaded", url), e);
+                       }
+               }
+       }
+
+       public Map<String, String> getImplementations() {
+               return implementations;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java
new file mode 100755
index 0000000..64e54ad
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+
+public class ODFInitializer {
+
+       static Logger logger = Logger.getLogger(ODFInitializer.class.getName());
+
+       static Object initLock = new Object();
+
+       private static boolean running = false;
+       private static long lastStopTimestamp = 0;
+       private static long lastStartTimestamp = 0;
+       private static boolean startStopInProgress = false;
+       
+
+       public static long getLastStopTimestamp() {
+               synchronized (initLock) {
+                       return lastStopTimestamp;
+               }
+       }
+
+       public static long getLastStartTimestamp() {
+               synchronized (initLock) {
+                       return lastStartTimestamp;
+               }
+       }
+
+       public static boolean isRunning() {
+               synchronized (initLock) {
+                       return running;
+               }
+       }
+       
+       public static boolean isStartStopInProgress() {
+               return startStopInProgress;
+       }
+
+       public static void start() {
+               synchronized (initLock) {
+                       if (!running) {
+                               startStopInProgress = true;
+                               DiscoveryServiceQueueManager qm = new 
ODFInternalFactory().create(DiscoveryServiceQueueManager.class);
+                               try {
+                                       qm.start();
+                               } catch (Exception e) {
+                                       logger.log(Level.WARNING, "Timeout 
occurred while starting ODF", e);
+                               }
+                               lastStartTimestamp = System.currentTimeMillis();
+                               running = true;
+                               startStopInProgress = false;
+                       }
+               }
+       }
+
+       public static void stop() {
+               synchronized (initLock) {
+                       if (running) {
+                               startStopInProgress = true;
+                               ODFInternalFactory f = new ODFInternalFactory();
+                               DiscoveryServiceQueueManager qm = 
f.create(DiscoveryServiceQueueManager.class);
+                               try {
+                                       qm.stop();
+                               } catch (TimeoutException e) {
+                                       logger.log(Level.WARNING, "Timeout 
occurred while stopping ODF", e);
+                               }
+                               ThreadManager tm = 
f.create(ThreadManager.class);
+                               tm.shutdownAllUnmanagedThreads();
+                               AnalysisRequestTrackerStore arts = 
f.create(AnalysisRequestTrackerStore.class);
+                               arts.clearCache();
+                               lastStopTimestamp = System.currentTimeMillis();
+                               running = false;
+                               startStopInProgress = false;
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java
new file mode 100755
index 0000000..4fd09a7
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.utils.ODFLogConfig;
+
+public class ODFInternalFactory {
+
+       private static Properties defaultImplemenetations = 
Utils.readConfigProperties("org/apache/atlas/odf/core/internal/odf-default-implementation.properties");
+       private static ODFImplementations overwrittenImplementations = null;
+       private static Map<Class<?>, Object> singletons = new HashMap<>();
+
+       public static String SINGLETON_MARKER = "@singleton";
+
+       static {
+               ODFLogConfig.run();
+
+               Logger logger = 
Logger.getLogger(ODFInternalFactory.class.getName());
+               ClassLoader cl = ODFInternalFactory.class.getClassLoader();
+               String overwriteConfig = 
"org/apahe/atlas/odf/odf-implementation.properties";
+               overwrittenImplementations = new 
ODFImplementations(overwriteConfig, cl);
+               if (overwrittenImplementations.getImplementations().isEmpty()) {
+                       overwrittenImplementations = null;
+               } else {
+                       logger.log(Level.INFO, "Found overwritten 
implementation config: {0}", overwrittenImplementations.getImplementations());
+               }
+               if (overwrittenImplementations == null) {
+                       logger.log(Level.INFO, "Default implementations are 
used");
+               }
+       }
+
+       private Object createObject(Class<?> cl) throws ClassNotFoundException, 
IllegalAccessException, InstantiationException {
+               String clazz = null;
+               if (overwrittenImplementations != null) {
+                       clazz = 
overwrittenImplementations.getImplementations().get(cl.getName());
+               }
+               if (clazz == null) {
+                       clazz = 
defaultImplemenetations.getProperty(cl.getName());
+               }
+               if (clazz == null) {
+                       // finally try to instantiate the class as such
+                       clazz = cl.getName();
+               }
+               boolean isSingleton = false;
+               if (clazz.endsWith(SINGLETON_MARKER)) {
+                       clazz = clazz.substring(0, clazz.length() - 
SINGLETON_MARKER.length());
+                       isSingleton = true;
+               }
+               Object o = null;
+               Class<?> implClass = 
this.getClass().getClassLoader().loadClass(clazz);
+               if (isSingleton) {
+                       o = singletons.get(implClass);
+                       if (o == null) {
+                               o = implClass.newInstance();
+                               singletons.put(implClass, o);
+                       }
+               } else {
+                       o = implClass.newInstance();
+               }
+               return o;
+       }
+
+       @SuppressWarnings("unchecked")
+       public <T> T create(Class<T> cl) {
+               try {
+                       return (T) createObject(cl);
+               } catch (ClassNotFoundException e) {
+                       throw new RuntimeException(e);
+               } catch (IllegalAccessException e) {
+                       throw new RuntimeException(e);
+               } catch (InstantiationException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java
new file mode 100755
index 0000000..623a727
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+
+public class ODFUtils {
+       public static int DEFAULT_TIMEOUT_SECS = 10 * 60; // 10 minutes
+
+       public static AnalysisRequestStatus runSynchronously(AnalysisManager 
analysisManager, AnalysisRequest request) {
+               return runSynchronously(analysisManager, request, 
DEFAULT_TIMEOUT_SECS); // default is 
+       }
+
+       public static AnalysisRequestStatus runSynchronously(AnalysisManager 
analysisManager, AnalysisRequest request, int timeoutInSeconds) {
+               Logger logger = Logger.getLogger(ODFUtils.class.getName());
+               AnalysisResponse response = 
analysisManager.runAnalysis(request);
+               if (response.isInvalidRequest()) {
+                       AnalysisRequestStatus status = new 
AnalysisRequestStatus();
+                       status.setState(AnalysisRequestStatus.State.ERROR);
+                       status.setDetails(MessageFormat.format("Request was 
invalid. Details: {0}", response.getDetails()));
+                       status.setRequest(request);
+                       return status;
+               }
+               AnalysisRequestStatus status = null;
+               long startTime = System.currentTimeMillis();
+               boolean timeOutReached = false;
+               do {
+                       logger.fine("Polling for result...");
+                       status = 
analysisManager.getAnalysisRequestStatus(response.getId());
+                       try {
+                               Thread.sleep(1000);
+                       } catch (InterruptedException e) {
+                               e.printStackTrace();
+                       }
+                       long currentTime = System.currentTimeMillis();
+                       timeOutReached = (currentTime - startTime) > 
(timeoutInSeconds * 1000);
+               } while 
((AnalysisRequestStatus.State.ACTIVE.equals(status.getState()) || 
AnalysisRequestStatus.State.QUEUED.equals(status.getState()) //
+                               && !timeOutReached));
+               return status;
+
+       }
+
+       public static AnalysisRequestStatus.State 
combineStates(List<AnalysisRequestStatus.State> allStates) {
+               // if one of the requests is in error, so is the complete 
request
+               if (allStates.contains(AnalysisRequestStatus.State.ERROR)) {
+                       return AnalysisRequestStatus.State.ERROR;
+               }
+               // if no request could be found -> not found
+               if (Utils.containsOnly(allStates, new 
AnalysisRequestStatus.State[] { AnalysisRequestStatus.State.NOT_FOUND })) {
+                       return AnalysisRequestStatus.State.NOT_FOUND;
+               }
+               // if all request are either not found or finished -> finished
+               if (Utils.containsOnly(allStates, new 
AnalysisRequestStatus.State[] { AnalysisRequestStatus.State.NOT_FOUND, 
AnalysisRequestStatus.State.FINISHED })) {
+                       return AnalysisRequestStatus.State.FINISHED;
+               }
+               // else always return active
+               return AnalysisRequestStatus.State.ACTIVE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java
new file mode 100755
index 0000000..e8361fd
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.importer.JDBCMetadataImporter;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.engine.EngineManager;
+import org.apache.atlas.odf.api.engine.ServiceRuntimesInfo;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class OpenDiscoveryFrameworkImpl implements OpenDiscoveryFramework {
+
+       private Logger logger = 
Logger.getLogger(OpenDiscoveryFrameworkImpl.class.getName());
+
+       public OpenDiscoveryFrameworkImpl() {
+               if (!ODFInitializer.isRunning() && 
!ODFInitializer.isStartStopInProgress()) {
+                       logger.log(Level.INFO, "Initializing Open Discovery 
Platform");
+                       ODFInitializer.start();
+                       getEngineManager().checkHealthStatus(); // This 
implicitly initializes the control center and the message queues
+                       
+                       logger.log(Level.INFO, "Open Discovery Platform 
successfully initialized.");
+                       
+                       // log active runtimes
+                       ServiceRuntimesInfo activeRuntimesInfo = 
ServiceRuntimes.getRuntimesInfo(ServiceRuntimes.getActiveRuntimes());
+                       try {
+                               logger.log(Level.INFO, "Active runtimes: 
''{0}''", JSONUtils.toJSON(activeRuntimesInfo));
+                       } catch (JSONException e) {
+                               logger.log(Level.WARNING, "Active runtime info 
has wrong format", e);
+                       }
+               }
+       }
+
+       public AnalysisManager getAnalysisManager() {
+               return new ODFInternalFactory().create(AnalysisManager.class);
+       }
+
+       public DiscoveryServiceManager getDiscoveryServiceManager() {
+               return new 
ODFInternalFactory().create(DiscoveryServiceManager.class);
+       }
+
+       public EngineManager getEngineManager() {
+               return new ODFInternalFactory().create(EngineManager.class);
+       }
+
+       public SettingsManager getSettingsManager() {
+               return new ODFInternalFactory().create(SettingsManager.class);
+       }
+
+       public AnnotationStore getAnnotationStore() {
+               return new ODFInternalFactory().create(AnnotationStore.class);
+       }
+
+       public MetadataStore getMetadataStore() {
+               return new ODFInternalFactory().create(MetadataStore.class);
+       }
+
+       public JDBCMetadataImporter getJDBCMetadataImporter() {
+               return new 
ODFInternalFactory().create(JDBCMetadataImporter.class);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java
new file mode 100755
index 0000000..e58dd37
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+
+public class StandaloneEnvironment implements Environment {
+
+       @Override
+       public String getProperty(String propertyName) {
+               return System.getProperty(propertyName);
+       }
+
+       @Override
+       public String getCurrentUser() {
+               return System.getProperty("user.name");
+       }
+
+       @Override
+       public String getZookeeperConnectString() {
+               return getProperty("odf.zookeeper.connect");
+       }
+
+       @Override
+       public ConfigContainer getDefaultConfiguration() {
+               return 
Utils.readConfigurationFromClasspath("org/apache/atlas/odf/core/internal/odf-initial-configuration.json");
+       }
+
+       @Override
+       public Map<String, String> getPropertiesWithPrefix(String prefix) {
+               Map<String, String> foundProps = new HashMap<>();
+               Properties props = System.getProperties();
+               for (String key : props.stringPropertyNames()) {
+                       if (key.startsWith(prefix)) {
+                               foundProps.put(key, props.getProperty(key));
+                       }
+               }
+               return foundProps;
+       }
+
+       @Override
+       public List<String> getActiveRuntimeNames() {
+               String p = getProperty("odf.active.runtimes");
+               if (p == null || p.equals("ALL")) {
+                       return null;
+               }
+               if (p.equals("NONE")) {
+                       return new ArrayList<>();
+               }
+               return Arrays.asList(p.split(","));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java
new file mode 100755
index 0000000..060f9fb
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.settings.KafkaConsumerConfig;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONObject;
+
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+
+public class Utils {
+
+       static Logger logger = Logger.getLogger(Utils.class.getName());
+
+       private static final List<Class<? extends Object>> MERGABLE_CLASSES = 
Arrays.asList(ConfigContainer.class, KafkaConsumerConfig.class, 
ODFSettings.class, DiscoveryServiceProperties.class);
+
+       public static void mergeODFPOJOs(Object source, Object update) {
+               if (!source.getClass().isAssignableFrom(update.getClass())) {
+                       return;
+               }
+
+               Method[] sourceMethods = source.getClass().getDeclaredMethods();
+
+               for (Method getterMethod : sourceMethods) {
+                       if (getterMethod.getName().startsWith("get") || 
getterMethod.getName().startsWith("is")) {
+                               String setterMethodName = 
getterMethod.getName().replaceFirst("get", "set");
+                               if (getterMethod.getName().startsWith("is")) {
+                                       setterMethodName = 
setterMethodName.replaceFirst("is", "set");
+                               }
+                               try {
+                                       Method setterMethod = 
source.getClass().getDeclaredMethod(setterMethodName, 
getterMethod.getReturnType());
+                                       Object updateValue = 
getterMethod.invoke(update);
+                                       if (updateValue != null) {
+                                               Object sourceValue = 
getterMethod.invoke(source);
+
+                                               if (sourceValue != null && 
MERGABLE_CLASSES.contains(updateValue.getClass())) {
+                                                       //Value is another 
POJO, must also try merging these instead of overwriting
+                                                       
mergeODFPOJOs(sourceValue, updateValue);
+                                                       
setterMethod.invoke(source, sourceValue);
+                                               } else if (sourceValue 
instanceof Map && updateValue instanceof Map) {
+                                                       Map updateJSON = (Map) 
updateValue;
+                                                       Map sourceJSON = (Map) 
sourceValue;
+                                                       for (Object key : 
updateJSON.keySet()) {
+                                                               
sourceJSON.put(key, updateJSON.get(key));
+                                                       }
+                                                       
setterMethod.invoke(source, sourceJSON);
+                                               } else {
+                                                       
setterMethod.invoke(source, updateValue);
+                                               }
+                                       }
+
+                               } catch (NoSuchMethodException e) {
+                                       throw new 
RuntimeException(MessageFormat.format("Objects of type {0}  and {1} could not 
be merged, no matching method found for {2}!", source.getClass().getName(), 
update
+                                                       .getClass().getName(), 
getterMethod.getName()), e);
+                               } catch (SecurityException e) {
+                                       throw new 
RuntimeException(MessageFormat.format("Objects of type {0}  and {1} could not 
be merged, method {2} could not be accessed (SecurityException)!", 
source.getClass()
+                                                       .getName(), 
update.getClass().getName(), setterMethodName), e);
+                               } catch (IllegalAccessException e) {
+                                       throw new 
RuntimeException(MessageFormat.format("Objects of type {0}  and {1} could not 
be merged, method {2} could not be accessed! (IllegalAccessException)", 
source.getClass()
+                                                       .getName(), 
update.getClass().getName(), getterMethod.getName()), e);
+                               } catch (IllegalArgumentException e) {
+                                       throw new 
RuntimeException(MessageFormat.format("Objects of type {0}  and {1} could not 
be merged, method {2} does not accept the right parameters!", 
source.getClass().getName(),
+                                                       
update.getClass().getName(), setterMethodName), e);
+                               } catch (InvocationTargetException e) {
+                                       e.printStackTrace();
+                                       throw new 
RuntimeException(MessageFormat.format("Objects of type {0}  and {1} could not 
be merged, method {2} or {3} could not be invoked!", 
source.getClass().getName(), update
+                                                       .getClass().getName(), 
getterMethod.getName(), setterMethodName), e);
+                               }
+
+                       }
+               }
+       }
+
+       public static Properties readConfigProperties(String path) {
+               // TODO cache this in static variables, it doesn't change at 
runtime 
+               InputStream is = 
Utils.class.getClassLoader().getResourceAsStream(path);
+               if (is == null) {
+                       return null;
+               }
+               Properties props = new Properties();
+               try {
+                       props.load(is);
+               } catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
+               return props;
+       }
+
+       public static void setCurrentTimeAsLastModified(AnalysisRequestTracker 
tracker) {
+               tracker.setLastModified(System.currentTimeMillis());
+       }
+
+       public static String getExceptionAsString(Throwable exc) {
+               StringWriter sw = new StringWriter();
+               PrintWriter pw = new PrintWriter(sw);
+               exc.printStackTrace(pw);
+               String st = sw.toString();
+               return st;
+       }
+
+       public static String collectionToString(Collection<?> coll, String 
separator) {
+               StringBuffer buf = null;
+               for (Object o : coll) {
+                       if (buf == null) {
+                               buf = new StringBuffer("[ ");
+                       } else {
+                               buf.append(separator);
+                       }
+                       buf.append(o.toString());
+               }
+               buf.append(" ]");
+               return buf.toString();
+       }
+
+       public static <T> boolean containsOnly(List<T> l, T[] elements) {
+               for (T t : l) {
+                       boolean containsOnlyElements = false;
+                       for (T el : elements) {
+                               if (t.equals(el)) {
+                                       containsOnlyElements = true;
+                                       break;
+                               }
+                       }
+                       if (!containsOnlyElements) {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       public static <T> boolean containsNone(List<T> l, T[] elements) {
+               for (T t : l) {
+                       boolean containsAnyElement = false;
+                       for (T el : elements) {
+                               if (t.equals(el)) {
+                                       containsAnyElement = true;
+                                       break;
+                               }
+                       }
+                       if (containsAnyElement) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       public static List<String> splitString(String s, char separator) {
+               List<String> l = new ArrayList<String>();
+               if (s != null) {
+                       StringTokenizer tok = new StringTokenizer(s, 
String.valueOf(separator));
+                       while (tok.hasMoreTokens()) {
+                               l.add(tok.nextToken());
+                       }
+               }
+               return l;
+       }
+
+       public static String getInputStreamAsString(InputStream is, String 
encoding) {
+               try {
+                       final int n = 2048;
+                       byte[] b = new byte[0];
+                       byte[] temp = new byte[n];
+                       int bytesRead;
+                       while ((bytesRead = is.read(temp)) != -1) {
+                               byte[] newB = new byte[b.length + bytesRead];
+                               System.arraycopy(b, 0, newB, 0, b.length);
+                               System.arraycopy(temp, 0, newB, b.length, 
bytesRead);
+                               b = newB;
+                       }
+                       String s = new String(b, encoding);
+                       return s;
+               } catch (IOException exc) {
+                       return getExceptionAsString(exc);
+               }
+       }
+
+       public static void mergeJSONObjects(JSONObject source, JSONObject 
target) {
+               if (source != null && target != null) {
+                       target.putAll(source);
+               }
+       }
+
+       public static <T> T getValue(T value, T defaultValue) {
+               if (value == null) {
+                       return defaultValue;
+               }
+               return value;
+       }
+
+       public static String getSystemPropertyExceptionIfMissing(String 
propertyName) {
+               Environment env = new 
ODFInternalFactory().create(Environment.class);
+               String value = env.getProperty(propertyName);
+               if (value == null) {
+                       String msg = MessageFormat.format("System property 
''{0}'' is not set", propertyName);
+                       logger.log(Level.SEVERE, msg);
+                       throw new RuntimeException(msg);
+               }
+               return value;
+       }
+       
+       public static int getIntEnvironmentProperty(String propertyName, int 
defaultValue) {
+               Environment env = new 
ODFInternalFactory().create(Environment.class);
+               String value = env.getProperty(propertyName);
+               if (value == null) {
+                       return defaultValue;
+               }
+               try {
+                       return Integer.parseInt(value);
+               } catch(NumberFormatException exc) {
+                       return defaultValue;
+               }
+       }
+
+
+       public static void runSystemCommand(String command) {
+               logger.log(Level.INFO, "Running system command: " + command);
+               try {
+                       Runtime r = Runtime.getRuntime();
+                       Process p = r.exec(command);
+                       p.waitFor();
+                       BufferedReader b = new BufferedReader(new 
InputStreamReader(p.getInputStream()));
+                       String line = "";
+                       while ((line = b.readLine()) != null) {
+                               logger.log(Level.INFO, "System command out: " + 
line);
+                       }
+                       b.close();
+               } catch(IOException | InterruptedException e) {
+                       logger.log(Level.INFO, "Error executing system 
command.", e);
+               }
+       }
+       
+       public static ConfigContainer readConfigurationFromClasspath(String 
jsonFileInClasspath) {
+               InputStream is = 
SettingsManager.class.getClassLoader().getResourceAsStream(jsonFileInClasspath);
+               try {
+                       JSONObject configJSON = new JSONObject(is);
+                       ConfigContainer config = 
JSONUtils.fromJSON(configJSON.write(), ConfigContainer.class);
+                       return config;
+               } catch (Exception exc) {
+                       throw new RuntimeException(exc);
+               }
+       }
+
+       public static String joinStrings(List<String> l, char separator) {
+               String result = null;
+               if ((l != null) && !l.isEmpty()) {
+                       StringBuilder buf = null;
+                       for (String s : l) {
+                               if (buf == null) {
+                                       buf = new StringBuilder();
+                               } else {
+                                       buf.append(separator);
+                               }
+                               buf.append(s);
+                       }
+                       result = buf.toString();
+               }
+               return result;
+       }
+       
+       public static String getEnvironmentProperty(String name, String 
defaultValue) {
+               Environment env = new 
ODFInternalFactory().create(Environment.class);
+               String s = env.getProperty(name);
+               return s != null ? s : defaultValue;            
+       }
+       
+       public static long getEnvironmentProperty(String name, long 
defaultValue) {
+               Environment env = new 
ODFInternalFactory().create(Environment.class);
+               String s = env.getProperty(name);
+               if (s == null) {
+                       return defaultValue;
+               }
+               try {
+                       return Long.parseLong(s);
+               } catch(NumberFormatException exc) {
+                       String msg = MessageFormat.format("Property ''{0}'' 
could not be converted to an integer", new Object[]{name});
+                       logger.log(Level.WARNING, msg);
+                       return defaultValue;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java
new file mode 100755
index 0000000..8f7fab2
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.analysis;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisCancelResult;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestSummary;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackers;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.ODFUtils;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.json.JSONUtils;
+
+/**
+ *
+ * External Java API for creating and managing analysis requests
+ *
+ */
+public class AnalysisManagerImpl implements AnalysisManager {
+
+       public final static char COMPOUND_REQUEST_SEPARATOR = ',';
+       private Logger logger = 
Logger.getLogger(AnalysisManagerImpl.class.getName());
+       private ControlCenter controlCenter;
+
+       public AnalysisManagerImpl() {
+               controlCenter = new 
ODFInternalFactory().create(ControlCenter.class);
+       }
+
+       /**
+        * Issues a new ODF analysis request
+        *
+        * @param request Analysis request
+        * @return Response containing the request id and status information
+        */
+       public AnalysisResponse runAnalysis(AnalysisRequest request) {
+               if (((request.getDiscoveryServiceSequence() == null) || 
request.getDiscoveryServiceSequence().isEmpty())
+                       && ((request.getAnnotationTypes() == null) || 
request.getAnnotationTypes().isEmpty())) {
+                       AnalysisResponse response = new AnalysisResponse();
+                       response.setId(request.getId());
+                       response.setDetails("Either a sequence of discovery 
service ids or a list of annotation types must be specified to initiate an 
analysis request.");
+                       response.setInvalidRequest(true);
+                       return response;
+               }
+
+               if ((request.getDataSets().size() == 1) || 
request.isProcessDataSetsSequentially()) {
+                       logger.log(Level.INFO, "Using sequential request 
processing (maybe because there is only a single data set)");
+                       AnalysisResponse response = 
controlCenter.startRequest(request);
+                       logger.log(Level.INFO, "Request with ID ''{0}'' started 
on data sets ''{1}''. Complete request: {2}.",
+                                       new Object[] { response.getId(), 
request.getDataSets(), JSONUtils.lazyJSONSerializer(request) });
+                       return response;
+               }
+
+               List<String> requestIDs = new ArrayList<String>();
+               List<String> detailsMessages = new ArrayList<String>();
+               boolean invalidRequest = true;
+               logger.log(Level.INFO, "Running requests for ''{0}'' data sets 
in parallel", request.getDataSets().size());
+               logger.log(Level.FINE, "Splitting request into multiple request 
for each data set. Data Sets: {0}", request.getDataSets());
+               for (MetaDataObjectReference dataSet : request.getDataSets()) {
+                       AnalysisRequest partRequest = new AnalysisRequest();
+                       
partRequest.setDiscoveryServiceSequence(request.getDiscoveryServiceSequence());
+                       
partRequest.setAdditionalProperties(request.getAdditionalProperties());
+                       
partRequest.setDataSets(Collections.singletonList(dataSet));
+                       AnalysisResponse partResponse = 
controlCenter.startRequest(partRequest);
+                       if (!partResponse.isInvalidRequest()) {
+                               String partRequestID = partResponse.getId();
+                               requestIDs.add(partRequestID);
+                               detailsMessages.add(partResponse.getDetails());
+                               // as soon as one request is valid, we make the 
compound request valid
+                               invalidRequest = false;
+                       }
+               }
+               AnalysisResponse response = new AnalysisResponse();
+               response.setId(Utils.joinStrings(requestIDs, 
COMPOUND_REQUEST_SEPARATOR));
+               response.setDetails(Utils.joinStrings(detailsMessages, 
COMPOUND_REQUEST_SEPARATOR));
+               response.setInvalidRequest(invalidRequest);
+               return response;
+       }
+
+       /**
+        * Retrieve status of an ODF analysis request
+        *
+        * @param requestId Unique id of the analysis request
+        * @return Status of the analysis request
+        */
+       public AnalysisRequestStatus getAnalysisRequestStatus(String requestId) 
{
+               List<String> singleRequestIds = Utils.splitString(requestId, 
COMPOUND_REQUEST_SEPARATOR);
+               if (singleRequestIds.size() == 1) {
+                       AnalysisRequestStatus status = 
controlCenter.getRequestStatus(requestId);
+                       return status;
+               }
+               AnalysisRequestStatus compoundStatus = new 
AnalysisRequestStatus();
+               compoundStatus.setState(State.QUEUED);
+               AnalysisRequest compoundRequest = new AnalysisRequest(); // 
assemble a compound request 
+               compoundRequest.setId(requestId);
+               List<String> allMessages = new ArrayList<String>();
+               List<MetaDataObjectReference> allDataSets = new ArrayList<>();
+               List<State> allStates = new ArrayList<>();
+               for (String singleRequestId : singleRequestIds) {       
+                       AnalysisRequestStatus singleStatus = 
controlCenter.getRequestStatus(singleRequestId);
+                       if (compoundRequest.getDiscoveryServiceSequence() == 
null) {
+                               // assume all fields of the single requests are 
the same
+                               // since they were created through runAnalysis()
+                               
compoundRequest.setDiscoveryServiceSequence(singleStatus.getRequest().getDiscoveryServiceSequence());
+                               
compoundRequest.setAdditionalProperties(singleStatus.getRequest().getAdditionalProperties());
+                       }
+                       if (singleStatus.getRequest().getDataSets() != null) {
+                               
allDataSets.addAll(singleStatus.getRequest().getDataSets());
+                       }
+                       allStates.add(singleStatus.getState());
+                       allMessages.add(singleStatus.getDetails());
+               }
+               compoundRequest.setDataSets(allDataSets);
+
+               compoundStatus.setState(ODFUtils.combineStates(allStates));
+               compoundStatus.setRequest(compoundRequest);
+               compoundStatus.setDetails(Utils.joinStrings(allMessages, 
COMPOUND_REQUEST_SEPARATOR));
+               return compoundStatus;
+       }
+
+       /**
+        * Retrieve statistics about all previous ODF analysis requests
+        *
+        * @return Request summary
+        */
+       public AnalysisRequestSummary getAnalysisStats() {
+               AnalysisRequestTrackerStore store = new 
ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
+               return store.getRequestSummary();
+       }
+
+       /**
+        * Retrieve status details of recent ODF analysis requests
+        *
+        * @param offset Starting offset (use 0 to start with the latest 
request)
+        * @param limit Maximum number of analysis requests to be returned (use 
-1 to retrieve all requests)
+        * @return Status details for each discovery request
+        */
+       public AnalysisRequestTrackers getAnalysisRequests(int offset, int 
limit) {
+               AnalysisRequestTrackerStore store = new 
ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
+               AnalysisRequestTrackers analysisrequestTrackers = new 
AnalysisRequestTrackers();
+               
analysisrequestTrackers.setAnalysisRequestTrackers(store.getRecentTrackers(offset,
 limit));
+               return analysisrequestTrackers;
+       }
+
+       /**
+        * Request a specific ODF discovery request to be canceled
+        *
+        * @param requestId Unique id of the analysis request
+        * @return Status of the cancellation attempt
+        */
+       public AnalysisCancelResult cancelAnalysisRequest(String requestId) {
+               return controlCenter.cancelRequest(requestId);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java
new file mode 100755
index 0000000..798b2d3
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.annotation;
+
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResult;
+
+public class InternalAnnotationStoreUtils {
+
+       public static void storeDiscoveryServiceResult(DiscoveryServiceResult 
result, AnalysisRequest req) {
+               Logger logger = 
Logger.getLogger(InternalAnnotationStoreUtils.class.getName());
+               AnnotationStore mds = new 
ODFFactory().create().getAnnotationStore();
+               mds.setAnalysisRun(req.getId());
+               if (result != null) {
+                       logger.log(Level.FINE, "Persisting annotations returned 
by discovery service");
+                       List<Annotation> annotations = result.getAnnotations();
+                       if (annotations != null) {
+                               for (Annotation annot : annotations) {
+                                       // only persist if reference was not set
+                                       if (annot.getReference() == null) {
+                                               mds.store(annot);
+                                       } else {
+                                               logger.log(Level.WARNING, 
"Returned annotation object has a non-null reference set and will not be 
persisted (reference: {0})", annot.getReference().toString());
+                                       }
+                               }
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java
new file mode 100755
index 0000000..f779155
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.configuration;
+
+
+import java.util.List;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+/**
+ * { 
+ *     "odf" : {...},
+ *     "userDefined" : {...}
+ * }
+ *
+ *
+ * This class is final, because reflection is used to access getters / setters 
in order to merge. This doesn't work with inherited methods
+ */
+@ApiModel(description="All ODF configuration options.")
+public final class ConfigContainer {
+
+       @ApiModelProperty(value="General ODF configuration options along with 
details about available discovery services", required=true)
+       private ODFSettings odf;
+
+       @ApiModelProperty(value="Details about available discovery services")
+       private List<DiscoveryServiceProperties> registeredServices = null;
+
+       public List<DiscoveryServiceProperties> getRegisteredServices() {
+               return registeredServices;
+       }
+
+       public void setRegisteredServices(List<DiscoveryServiceProperties> 
registeredServices) {
+               this.registeredServices = registeredServices;
+       }
+
+       public ODFSettings getOdf() {
+               return odf;
+       }
+
+       public void setOdf(ODFSettings odfSettings) {
+               this.odf = odfSettings;
+       }
+
+       public void validate() throws ValidationException {
+               if (this.odf != null) {
+                       odf.validate();
+               }
+               if (this.registeredServices != null) {
+                       new 
ServiceValidator().validate("ODFConfig.registeredServices", 
this.registeredServices);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java
new file mode 100755
index 0000000..7ad90e6
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.configuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.settings.SparkConfig;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.Encryption;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.core.store.ODFConfigurationStorage;
+
+public class ConfigManager {
+       private Logger logger = Logger.getLogger(ConfigManager.class.getName());
+       public static final String HIDDEN_PASSWORD_IDENTIFIER = "***hidden***";
+       public static final long CONFIG_UPDATE_SLEEP_BETWEEN_POLLS = 20;
+       public static final int CONFIG_UPDATE_MAX_POLLS = 1500;
+       private static final String DEFAULT_ENCRYPTED_SPARK_CONFIGS = 
"spark.authenticate.secret,spark.ssl.keyPassword,spark.ssl.keyStorePassword,spark.ssl.trustStorePassword";
+
+       protected ODFConfigurationStorage configurationStore;
+       protected ODFConfigNotificationPublisher notificationManager;
+
+       public ConfigManager() {
+               ODFInternalFactory f = new ODFInternalFactory();
+               this.configurationStore = 
f.create(ODFConfigurationStorage.class);
+               this.notificationManager = 
f.create(ODFConfigNotificationPublisher.class);
+       }
+
+       public ConfigContainer getConfigContainer() {
+               ConfigContainer config = 
configurationStore.getConfig(getDefaultConfigContainer());
+               return config;
+       }
+
+       public ConfigContainer getConfigContainerHidePasswords() {
+               ConfigContainer config = 
configurationStore.getConfig(getDefaultConfigContainer());
+               hidePasswords(config);
+               return config;
+       }
+
+       public void updateConfigContainer(ConfigContainer update) throws 
ValidationException {
+               try {
+                       update = JSONUtils.cloneJSONObject(update);
+               } catch (JSONException e) {
+                       throw new RuntimeException(e);
+               }
+               update.validate();
+               ConfigContainer source = getConfigContainer();
+               unhideAndEncryptPasswords(update, source);
+
+               List<DiscoveryServiceProperties> newServicesToRun = new 
ArrayList<DiscoveryServiceProperties>();
+               if (update.getRegisteredServices() != null
+                               && source.getRegisteredServices().size() < 
update.getRegisteredServices().size()) {
+                       // store added services if update registers new ones
+                       List<DiscoveryServiceProperties> newRegisteredServices 
= new ArrayList<DiscoveryServiceProperties>();
+                       
newRegisteredServices.addAll(update.getRegisteredServices());
+                       for (DiscoveryServiceProperties oldService : 
source.getRegisteredServices()) {
+                               for (int no = 0; no < 
newRegisteredServices.size(); no++) {
+                                       if 
(newRegisteredServices.get(no).getId().equals(oldService.getId())) {
+                                               
newRegisteredServices.remove(no);
+                                               break;
+                                       }
+                               }
+                       }
+
+                       newServicesToRun.addAll(newRegisteredServices);
+               }
+
+               Utils.mergeODFPOJOs(source, update);
+               configurationStore.storeConfig(source);
+
+               if (source.getOdf().getRunNewServicesOnRegistration() && 
!newServicesToRun.isEmpty()) {
+                       runNewServices(newServicesToRun);
+               }
+
+               String changeId = UUID.randomUUID().toString();
+               configurationStore.addPendingConfigChange(changeId);
+               this.notificationManager.publishConfigChange(source, changeId);
+               for (int i=0; i < CONFIG_UPDATE_MAX_POLLS; i++) {
+                       if 
(!configurationStore.isConfigChangePending(changeId)) {
+                               logger.log(Level.INFO, 
MessageFormat.format("Config change id ''{0}'' successfully completed after {1} 
msec.", new Object[] { changeId, i * CONFIG_UPDATE_SLEEP_BETWEEN_POLLS } ));
+                               return;
+                       }
+                       try {
+                               Thread.sleep(CONFIG_UPDATE_SLEEP_BETWEEN_POLLS);
+                       } catch (InterruptedException e) {
+                               // Ignore interrupt
+                               logger.log(Level.WARNING, "Sleep period was 
interrupted", e);
+                       }
+               }
+               logger.log(Level.WARNING, MessageFormat.format("Config change 
did not complete after {0} msec.", CONFIG_UPDATE_SLEEP_BETWEEN_POLLS * 
CONFIG_UPDATE_MAX_POLLS));
+       }
+
+       public void resetConfigContainer() {
+               logger.warning("resetting ODF configuration!");
+               configurationStore.storeConfig(getDefaultConfigContainer());
+       }
+
+       private static String defaultConfig = null;
+
+       List<DiscoveryServiceProperties> getServicesFoundOnClassPath() throws 
IOException, JSONException {
+               ClassLoader cl = this.getClass().getClassLoader();
+               Enumeration<URL> services = 
cl.getResources("META-INF/odf/odf-services.json");
+               List<DiscoveryServiceProperties> result = new ArrayList<>();
+               while (services.hasMoreElements()) {
+                       URL url = services.nextElement();
+                       InputStream is = url.openStream();
+                       String json = Utils.getInputStreamAsString(is, "UTF-8");
+                       logger.log(Level.INFO, "Service found on the classpath 
at {0}: {1}", new Object[] { url, json });
+                       result.addAll(JSONUtils.fromJSONList(json, 
DiscoveryServiceProperties.class));
+               }
+               logger.log(Level.INFO, "Number of classpath services found: 
{0}", result.size());
+               return result;
+       }
+
+       private ConfigContainer getDefaultConfigContainer() {
+               if (defaultConfig == null) {                    
+                       try {
+                               ConfigContainer config = new 
ODFInternalFactory().create(Environment.class).getDefaultConfiguration();
+                               // now look for services found on the classpath
+                               
config.getRegisteredServices().addAll(getServicesFoundOnClassPath());
+                               defaultConfig = JSONUtils.toJSON(config);
+                       } catch (IOException | JSONException e) {
+                               String msg = "Default config could not be 
loaded or parsed!";
+                               logger.severe(msg);
+                               throw new RuntimeException(msg, e);
+                       }
+               }
+               try {
+                       return JSONUtils.fromJSON(defaultConfig, 
ConfigContainer.class);
+               } catch (JSONException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       private void runNewServices(List<DiscoveryServiceProperties> 
newServices) {
+               ControlCenter cc = new 
ODFInternalFactory().create(ControlCenter.class);
+               List<String> servicesToRun = new ArrayList<String>();
+               for (DiscoveryServiceProperties info : newServices) {
+                       servicesToRun.add(info.getId());
+               }
+
+               AnalysisRequest req = new AnalysisRequest();
+               MetadataStore mds = new 
ODFFactory().create().getMetadataStore();
+               req.setDiscoveryServiceSequence(servicesToRun);
+               
req.setDataSets(mds.search(mds.newQueryBuilder().objectType("DataSet").build()));
+               req.setIgnoreDataSetCheck(true);
+               cc.startRequest(req);
+       }
+
+       private void unhideAndEncryptPasswords(ConfigContainer 
updatedConfigContainer,
+                       ConfigContainer originalConfiguration) {
+               if (updatedConfigContainer.getOdf() != null) {
+                       String odfPassword = 
updatedConfigContainer.getOdf().getOdfPassword();
+                       if (odfPassword != null) {
+                               if 
(odfPassword.equals(HIDDEN_PASSWORD_IDENTIFIER)) {
+                                       // Password was not changed, therefore 
keep original
+                                       // encrypted password
+                                       
updatedConfigContainer.getOdf().setOdfPassword(originalConfiguration.getOdf().getOdfPassword());
+                               } else if 
(!Encryption.isEncrypted(odfPassword)) {
+                                       
updatedConfigContainer.getOdf().setOdfPassword(Encryption.encryptText(odfPassword));
+                               }
+                       }
+                       if (updatedConfigContainer.getOdf().getSparkConfig() != 
null) {
+                               SparkConfig updatedSparkConfig = 
updatedConfigContainer.getOdf().getSparkConfig();
+                               if (updatedSparkConfig.getConfigs() != null) {
+                                       List<String> encryptedSparkConfigs = 
Arrays.asList(DEFAULT_ENCRYPTED_SPARK_CONFIGS.split(","));
+                                       for (String configName : 
updatedSparkConfig.getConfigs().keySet()) {
+                                               if 
(encryptedSparkConfigs.contains(configName)) {
+                                                       String 
updatedConfigValue = (String) updatedSparkConfig.getConfigs().get(configName);
+                                                       if 
(updatedConfigValue.equals(HIDDEN_PASSWORD_IDENTIFIER)) {
+                                                               // Encrypted 
value was not changed, therefore keep original
+                                                               // Encrypted 
value
+                                                               SparkConfig 
originalSparkConfig = originalConfiguration.getOdf().getSparkConfig();
+                                                               
updatedSparkConfig.setConfig(configName, 
originalSparkConfig.getConfigs().get(configName));
+                                                       } else if 
(!Encryption.isEncrypted(updatedConfigValue)) {
+                                                               
updatedSparkConfig.setConfig(configName, 
Encryption.encryptText(updatedConfigValue));
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
+
+       private void hidePasswords(ConfigContainer configContainer) {
+               if (configContainer.getOdf() != null) {
+                       if (configContainer.getOdf().getOdfPassword() != null) {
+                               
configContainer.getOdf().setOdfPassword(HIDDEN_PASSWORD_IDENTIFIER);
+                       }
+                       if ((configContainer.getOdf().getSparkConfig() != 
null)){
+                               SparkConfig sparkConfig = 
configContainer.getOdf().getSparkConfig();
+                               if (sparkConfig.getConfigs() != null) {
+                                       List<String> encryptedSparkConfigs = 
Arrays.asList(DEFAULT_ENCRYPTED_SPARK_CONFIGS.split(","));
+                                       for (String configName : 
sparkConfig.getConfigs().keySet()) {
+                                               if 
(((encryptedSparkConfigs.contains(configName)) && 
(sparkConfig.getConfigs().get(configName)) != null)) {
+                                                       
sparkConfig.setConfig(configName, HIDDEN_PASSWORD_IDENTIFIER);
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java
new file mode 100755
index 0000000..a7f822f
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.configuration;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage.Type;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class ODFConfigNotificationPublisher {
+
+       Logger logger = 
Logger.getLogger(ODFConfigNotificationPublisher.class.getName());
+
+       public void publishConfigChange(ConfigContainer update, String 
changeId) {
+               try {
+                       logger.log(Level.FINE, "publishing config change: {0}", 
JSONUtils.toJSON(update));
+                       ConfigContainer clone = 
JSONUtils.fromJSON(JSONUtils.toJSON(update), ConfigContainer.class);
+                       AdminMessage amsg = new AdminMessage();
+                       amsg.setId(changeId);
+                       amsg.setAdminMessageType(Type.CONFIGCHANGE);
+                       amsg.setConfigUpdateDetails(clone);
+                       amsg.setDetails("Configuration update");
+                       DiscoveryServiceQueueManager qm = new 
ODFInternalFactory().create(DiscoveryServiceQueueManager.class);
+                       qm.enqueueInAdminQueue(amsg);
+               } catch (Exception exc) {
+                       logger.log(Level.WARNING, "An unexpected exception 
occurres when writing to admin queue. Ignoring it", exc);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java
new file mode 100755
index 0000000..011d728
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.configuration;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.settings.validation.PropertyValidator;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+
+public class ServiceValidator implements PropertyValidator {
+
+       public void validate(String property, Object value) throws 
ValidationException {
+               validate(property, value, true);
+       }
+
+       private void validate(String property, Object value, boolean topLevel) 
throws ValidationException {
+               if (value == null) {
+                       throw new ValidationException("Null values are not 
allowed for this property");
+               }
+
+               if (value instanceof List) {
+                       List<DiscoveryServiceProperties> newServices = 
(List<DiscoveryServiceProperties>) value;
+                       List<String> ids = new ArrayList<String>();
+                       for (int no = 0; no < newServices.size(); no++) {
+                               DiscoveryServiceProperties service = 
(DiscoveryServiceProperties) newServices.get(no);
+                               validate(property, service, false);
+                               String serviceId = service.getId();
+                               if (ids.contains(serviceId)) {
+                                       throw new ValidationException(property, 
MessageFormat.format("you cannot register multiple services with the same id 
{0}!", serviceId));
+                               } else {
+                                       ids.add(serviceId);
+                               }
+                       }
+               } else if (value instanceof DiscoveryServiceProperties) {
+                       DiscoveryServiceProperties service = 
(DiscoveryServiceProperties) value;
+                       if (service.getId() == null || 
service.getId().trim().isEmpty() || service.getName() == null || 
service.getName().trim().isEmpty() || service.getEndpoint() == null) {
+                               throw new ValidationException(property, 
MessageFormat.format("A service requires {0}", "id, name and an endpoint"));
+                       }
+
+                       if (topLevel) {
+                               List<String> regServices = new 
ArrayList<String>();
+                               List<DiscoveryServiceProperties> services = new 
ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties();
+                               for (DiscoveryServiceProperties regService : 
services) {
+                                       regServices.add(regService.getId());
+                               }
+
+                               if (regServices.contains(service.getId())) {
+                                       throw new ValidationException(property, 
MessageFormat.format("a service with id {0} already exists!", service.getId()));
+                               }
+                       }
+
+                       ServiceRuntime runtime = 
ServiceRuntimes.getRuntimeForDiscoveryService(service);
+                       runtime.validate(service);
+               } else {
+                       throw new ValidationException(property, "only 
DiscoveryServiceRegistrationInfo objects or list of such objects are allowed 
for this property");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java
new file mode 100755
index 0000000..fffff6f
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+
+// JSON
+public class AdminMessage {
+       public static enum Type {
+               SHUTDOWN, RESTART, CONFIGCHANGE
+       }
+
+       private Type adminMessageType;
+       private String details;
+       private ConfigContainer configUpdateDetails;
+       private String messageId;
+
+       public Type getAdminMessageType() {
+               return adminMessageType;
+       }
+
+       public void setAdminMessageType(Type adminMessageType) {
+               this.adminMessageType = adminMessageType;
+       }
+
+       public String getDetails() {
+               return details;
+       }
+
+       public void setDetails(String details) {
+               this.details = details;
+       }
+
+       public ConfigContainer getConfigUpdateDetails() {
+               return configUpdateDetails;
+       }
+
+       public void setConfigUpdateDetails(ConfigContainer configUpdateDetails) 
{
+               this.configUpdateDetails = configUpdateDetails;
+       }
+
+       public String getId() {
+               return this.messageId;
+       }
+
+       public void setId(String messageId) {
+               this.messageId = messageId;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java
new file mode 100755
index 0000000..874e061
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.core.ODFInitializer;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class AdminQueueProcessor implements QueueMessageProcessor {
+
+       private Logger logger = 
Logger.getLogger(AdminQueueProcessor.class.getName());
+
+       @Override
+       public void process(ExecutorService executorService, String msg, int 
partition, long offset) {
+               AdminMessage adminMessage;
+               try {
+                       adminMessage = JSONUtils.fromJSON(msg, 
AdminMessage.class);
+               } catch (JSONException e) {
+                       throw new RuntimeException(e);
+               }
+               switch (adminMessage.getAdminMessageType()) {
+               case SHUTDOWN:
+                       initiateShutdown(executorService, false);
+                       break;
+               case RESTART:
+                       initiateShutdown(executorService, true);
+                       break;
+               default:
+                       // do nothing
+               }
+       }
+
+       static Object restartLockObject = new Object();
+
+       private void initiateShutdown(ExecutorService executorService, final 
boolean restart) {
+               logger.log(Level.INFO, "Shutdown of ODF was requested...");
+               Runnable shutDownRunnable = new Runnable() {
+
+                       @Override
+                       public void run() {
+                               logger.log(Level.INFO, "Initiating shutdown");
+
+                               // sleep some time before initiating the actual 
shutdown to give the process() a chance to return
+                               // before it is itself shut down
+                               long sleepTimeBeforeShutdown = 1000;
+                               try {
+                                       Thread.sleep(sleepTimeBeforeShutdown);
+                               } catch (InterruptedException e) {
+                                       // do nothing
+                                       e.printStackTrace();
+                               }
+
+                               synchronized (restartLockObject) {
+                                       logger.log(Level.INFO, "Shutting down 
ODF...");
+                                       try {
+                                               ODFInitializer.stop();
+                                               logger.log(Level.INFO, "ODF was 
shutdown");
+                                                                               
        
+                                               if (restart) {
+                                                       logger.log(Level.INFO, 
"Restarting ODF");
+                                                       ODFInitializer.start();
+                                                       logger.log(Level.INFO, 
"ODF restarted");
+                                               }
+                                       }  catch (Exception e) {
+                                               logger.log(Level.SEVERE, "An 
unexpected error occurred when shutting down ODF", e);
+                                       }
+                               }
+
+                       }
+
+               };
+
+               executorService.submit(shutDownRunnable);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java
new file mode 100755
index 0000000..e43bd45
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.List;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestSummary;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+
+
+public interface AnalysisRequestTrackerStore {
+       
+       /**
+        * set the status of old requests which were last modified before the 
cutOffTimestamp
+        * with an optional detailsMessage
+        */
+       void setStatusOfOldRequest(long cutOffTimestamp, STATUS status, String 
detailsMessage);
+       
+       // store / update the passed tracker
+       void store(AnalysisRequestTracker tracker);
+       
+       AnalysisRequestTracker query(String analysisRequestId);
+
+       AnalysisRequestTracker findSimilarQueuedRequest(AnalysisRequest 
request);
+       
+       /**
+        * @param number - number of trackers to retrieve, -1 for all
+        * @return
+        */
+       List<AnalysisRequestTracker> getRecentTrackers(int offset, int limit);
+       
+       /**
+        * Clear any internal caches, if any.
+        */
+       void clearCache(); 
+
+       int getSize();
+
+       AnalysisRequestSummary getRequestSummary();
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java
new file mode 100755
index 0000000..8100f18
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import 
org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse.ResponseCode;
+import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService;
+import 
org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncRunStatus;
+import 
org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import 
org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.core.Utils;
+
+public class AsyncDiscoveryServiceWrapper implements SyncDiscoveryService {
+
+       AsyncDiscoveryService wrappedService = null;
+
+       public AsyncDiscoveryServiceWrapper(AsyncDiscoveryService 
wrappedService) {
+               this.wrappedService = wrappedService;
+       }
+
+       @Override
+       public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest 
request) {
+               try {
+                       DiscoveryServiceAsyncStartResponse asyncResponse = 
wrappedService.startAnalysis(request);
+                       ResponseCode code = asyncResponse.getCode();
+                       if (code != ResponseCode.OK) {
+                               DiscoveryServiceSyncResponse response = new 
DiscoveryServiceSyncResponse();
+                               response.setCode(code);
+                               response.setDetails(asyncResponse.getDetails());
+                               return response;
+                       }
+                       // poll the async service
+                       final long maxWaitTimeSecs = 
Utils.getEnvironmentProperty("odf.async.max.wait.secs", 10 * 60); // default: 
10 minutes
+                       final long pollingIntervalMS = 
Utils.getEnvironmentProperty("odf.async.poll.interval.ms", 1000);
+                       long maxPolls = (maxWaitTimeSecs * 1000) / 
pollingIntervalMS;
+                       int pollCounter = 0;
+
+                       DiscoveryServiceSyncResponse response = new 
DiscoveryServiceSyncResponse();
+                       String runId = asyncResponse.getRunId();
+                       while (pollCounter < maxPolls) {
+                               Thread.sleep(pollingIntervalMS);
+                               DiscoveryServiceAsyncRunStatus status = 
wrappedService.getStatus(runId);
+                               switch (status.getState()) {
+                               case NOT_FOUND:
+                                       // should not happen
+                                       
response.setCode(ResponseCode.UNKNOWN_ERROR);
+                                       response.setDetails("Run ID " + runId + 
" was not found. This should not have happened.");
+                                       return response;
+                               case ERROR:
+                                       
response.setCode(ResponseCode.UNKNOWN_ERROR);
+                                       
response.setDetails(status.getDetails());
+                                       return response;
+                               case FINISHED:
+                                       response.setCode(ResponseCode.OK);
+                                       
response.setDetails(status.getDetails());
+                                       response.setResult(status.getResult());
+                                       return response;
+                               default:
+                                       // continue polling
+                                       pollCounter++;
+                               }
+                       }
+                       response.setCode(ResponseCode.UNKNOWN_ERROR);
+                       response.setDetails("Polled Async service for " + 
maxWaitTimeSecs + " seconds without positive result");
+                       return response;
+               } catch (Exception exc) {
+                       DiscoveryServiceSyncResponse response = new 
DiscoveryServiceSyncResponse();
+                       response.setCode(ResponseCode.UNKNOWN_ERROR);
+                       response.setDetails("An unknown error occurred: " + 
Utils.getExceptionAsString(exc));
+                       return response;
+               }
+       }
+
+       public void setExecutorService(ExecutorService executorService) {
+               wrappedService.setExecutorService(executorService);
+       }
+
+       public void setMetadataStore(MetadataStore metadataStore) {
+               wrappedService.setMetadataStore(metadataStore);
+       }
+
+       public void setAnnotationStore(AnnotationStore annotationStore) {
+               wrappedService.setAnnotationStore(annotationStore);
+       }
+
+       public DataSetCheckResult checkDataSet(DataSetContainer 
dataSetContainer) {
+               return wrappedService.checkDataSet(dataSetContainer);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java
new file mode 100755
index 0000000..bcd2965
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage.Type;
+import org.apache.atlas.odf.core.store.ODFConfigurationStorage;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class ConfigChangeQueueProcessor implements QueueMessageProcessor {
+
+       Logger logger = 
Logger.getLogger(ConfigChangeQueueProcessor.class.getName());
+       
+       @Override
+       public void process(ExecutorService executorService, String msg, int 
partition, long offset) {
+               try {
+                       AdminMessage amsg = JSONUtils.fromJSON(msg, 
AdminMessage.class);
+                       if 
(Type.CONFIGCHANGE.equals(amsg.getAdminMessageType())) {
+                               logger.info("Received config change: " + 
JSONUtils.toJSON(amsg));
+                               ODFInternalFactory f = new ODFInternalFactory();
+                               ODFConfigurationStorage configStorage = 
f.create(ODFConfigurationStorage.class);
+                               
configStorage.onConfigChange(amsg.getConfigUpdateDetails());
+                               
configStorage.removePendingConfigChange(amsg.getId());
+                       }
+               } catch(Exception exc) {
+                       logger.log(Level.WARNING, "An exception occurred while 
processing admin message", exc);
+               }
+       }
+
+}

Reply via email to