http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java new file mode 100755 index 0000000..4021049 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java @@ -0,0 +1,95 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.text.MessageFormat; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class ODFImplementations { + + Logger logger = Logger.getLogger(ODFImplementations.class.getName()); + + private Map<String, String> implementations = new HashMap<String, String>(); + + public ODFImplementations(String path, ClassLoader cl) { + Enumeration<URL> resources; + try { + resources = cl.getResources(path); + } catch (IOException exc) { + logger.log(Level.WARNING, MessageFormat.format("An error occurred while reading properties ''0'' could not be loaded", path), exc); + return; + } + while (resources.hasMoreElements()) { + URL url = resources.nextElement(); + try { + InputStream is = url.openStream(); + if (is != null) { + Properties props = new Properties(); + props.load(is); + for (Object key : props.keySet()) { + String keyString = (String) key; + try { + if (implementations.containsKey(key)) { + String existingClassString = implementations.get(keyString); + String newClassString = props.getProperty(keyString); + if (!existingClassString.equals(newClassString)) { + Class<?> existingClass = cl.loadClass(existingClassString); + Class<?> newClass = cl.loadClass(newClassString); + String superClass = null; + String subClass = null; + // select the class lowest in the class hierarchy + if (existingClass.isAssignableFrom(newClass)) { + superClass = existingClassString; + subClass = newClassString; + } else if (newClass.isAssignableFrom(existingClass)) { + superClass = newClassString; + subClass = existingClassString; + } + if (superClass != null) { + logger.log(Level.INFO, "Implementation for interface ''{0}'' was found more than once, using subclass ''{1}'' (found superclass ''{2}'')", + new Object[] { key, subClass, superClass }); + implementations.put(keyString, subClass); + } else { + logger.log(Level.WARNING, "Implementation for interface ''{0}'' was found more than once, using ''{1}''. (Conflict between ''{1}'' and ''{2}'')", + new Object[] { key, existingClassString, newClassString }); + } + } + } else { + cl.loadClass(props.getProperty(keyString)); + implementations.put(keyString, props.getProperty(keyString)); + } + } catch (ClassNotFoundException exc) { + logger.log(Level.SEVERE, "Class found in odf-implementation.properties file could not be loaded", exc); + } + } + } + } catch (IOException e) { + logger.log(Level.WARNING, MessageFormat.format("Properties ''0'' could not be loaded", url), e); + } + } + } + + public Map<String, String> getImplementations() { + return implementations; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java new file mode 100755 index 0000000..64e54ad --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java @@ -0,0 +1,97 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core; + +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore; +import org.apache.atlas.odf.core.controlcenter.ThreadManager; +import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager; + +public class ODFInitializer { + + static Logger logger = Logger.getLogger(ODFInitializer.class.getName()); + + static Object initLock = new Object(); + + private static boolean running = false; + private static long lastStopTimestamp = 0; + private static long lastStartTimestamp = 0; + private static boolean startStopInProgress = false; + + + public static long getLastStopTimestamp() { + synchronized (initLock) { + return lastStopTimestamp; + } + } + + public static long getLastStartTimestamp() { + synchronized (initLock) { + return lastStartTimestamp; + } + } + + public static boolean isRunning() { + synchronized (initLock) { + return running; + } + } + + public static boolean isStartStopInProgress() { + return startStopInProgress; + } + + public static void start() { + synchronized (initLock) { + if (!running) { + startStopInProgress = true; + DiscoveryServiceQueueManager qm = new ODFInternalFactory().create(DiscoveryServiceQueueManager.class); + try { + qm.start(); + } catch (Exception e) { + logger.log(Level.WARNING, "Timeout occurred while starting ODF", e); + } + lastStartTimestamp = System.currentTimeMillis(); + running = true; + startStopInProgress = false; + } + } + } + + public static void stop() { + synchronized (initLock) { + if (running) { + startStopInProgress = true; + ODFInternalFactory f = new ODFInternalFactory(); + DiscoveryServiceQueueManager qm = f.create(DiscoveryServiceQueueManager.class); + try { + qm.stop(); + } catch (TimeoutException e) { + logger.log(Level.WARNING, "Timeout occurred while stopping ODF", e); + } + ThreadManager tm = f.create(ThreadManager.class); + tm.shutdownAllUnmanagedThreads(); + AnalysisRequestTrackerStore arts = f.create(AnalysisRequestTrackerStore.class); + arts.clearCache(); + lastStopTimestamp = System.currentTimeMillis(); + running = false; + startStopInProgress = false; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java new file mode 100755 index 0000000..4fd09a7 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java @@ -0,0 +1,93 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.utils.ODFLogConfig; + +public class ODFInternalFactory { + + private static Properties defaultImplemenetations = Utils.readConfigProperties("org/apache/atlas/odf/core/internal/odf-default-implementation.properties"); + private static ODFImplementations overwrittenImplementations = null; + private static Map<Class<?>, Object> singletons = new HashMap<>(); + + public static String SINGLETON_MARKER = "@singleton"; + + static { + ODFLogConfig.run(); + + Logger logger = Logger.getLogger(ODFInternalFactory.class.getName()); + ClassLoader cl = ODFInternalFactory.class.getClassLoader(); + String overwriteConfig = "org/apahe/atlas/odf/odf-implementation.properties"; + overwrittenImplementations = new ODFImplementations(overwriteConfig, cl); + if (overwrittenImplementations.getImplementations().isEmpty()) { + overwrittenImplementations = null; + } else { + logger.log(Level.INFO, "Found overwritten implementation config: {0}", overwrittenImplementations.getImplementations()); + } + if (overwrittenImplementations == null) { + logger.log(Level.INFO, "Default implementations are used"); + } + } + + private Object createObject(Class<?> cl) throws ClassNotFoundException, IllegalAccessException, InstantiationException { + String clazz = null; + if (overwrittenImplementations != null) { + clazz = overwrittenImplementations.getImplementations().get(cl.getName()); + } + if (clazz == null) { + clazz = defaultImplemenetations.getProperty(cl.getName()); + } + if (clazz == null) { + // finally try to instantiate the class as such + clazz = cl.getName(); + } + boolean isSingleton = false; + if (clazz.endsWith(SINGLETON_MARKER)) { + clazz = clazz.substring(0, clazz.length() - SINGLETON_MARKER.length()); + isSingleton = true; + } + Object o = null; + Class<?> implClass = this.getClass().getClassLoader().loadClass(clazz); + if (isSingleton) { + o = singletons.get(implClass); + if (o == null) { + o = implClass.newInstance(); + singletons.put(implClass, o); + } + } else { + o = implClass.newInstance(); + } + return o; + } + + @SuppressWarnings("unchecked") + public <T> T create(Class<T> cl) { + try { + return (T) createObject(cl); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (InstantiationException e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java new file mode 100755 index 0000000..623a727 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java @@ -0,0 +1,77 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core; + +import java.text.MessageFormat; +import java.util.List; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.analysis.AnalysisManager; +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus; +import org.apache.atlas.odf.api.analysis.AnalysisResponse; + +public class ODFUtils { + public static int DEFAULT_TIMEOUT_SECS = 10 * 60; // 10 minutes + + public static AnalysisRequestStatus runSynchronously(AnalysisManager analysisManager, AnalysisRequest request) { + return runSynchronously(analysisManager, request, DEFAULT_TIMEOUT_SECS); // default is + } + + public static AnalysisRequestStatus runSynchronously(AnalysisManager analysisManager, AnalysisRequest request, int timeoutInSeconds) { + Logger logger = Logger.getLogger(ODFUtils.class.getName()); + AnalysisResponse response = analysisManager.runAnalysis(request); + if (response.isInvalidRequest()) { + AnalysisRequestStatus status = new AnalysisRequestStatus(); + status.setState(AnalysisRequestStatus.State.ERROR); + status.setDetails(MessageFormat.format("Request was invalid. Details: {0}", response.getDetails())); + status.setRequest(request); + return status; + } + AnalysisRequestStatus status = null; + long startTime = System.currentTimeMillis(); + boolean timeOutReached = false; + do { + logger.fine("Polling for result..."); + status = analysisManager.getAnalysisRequestStatus(response.getId()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + long currentTime = System.currentTimeMillis(); + timeOutReached = (currentTime - startTime) > (timeoutInSeconds * 1000); + } while ((AnalysisRequestStatus.State.ACTIVE.equals(status.getState()) || AnalysisRequestStatus.State.QUEUED.equals(status.getState()) // + && !timeOutReached)); + return status; + + } + + public static AnalysisRequestStatus.State combineStates(List<AnalysisRequestStatus.State> allStates) { + // if one of the requests is in error, so is the complete request + if (allStates.contains(AnalysisRequestStatus.State.ERROR)) { + return AnalysisRequestStatus.State.ERROR; + } + // if no request could be found -> not found + if (Utils.containsOnly(allStates, new AnalysisRequestStatus.State[] { AnalysisRequestStatus.State.NOT_FOUND })) { + return AnalysisRequestStatus.State.NOT_FOUND; + } + // if all request are either not found or finished -> finished + if (Utils.containsOnly(allStates, new AnalysisRequestStatus.State[] { AnalysisRequestStatus.State.NOT_FOUND, AnalysisRequestStatus.State.FINISHED })) { + return AnalysisRequestStatus.State.FINISHED; + } + // else always return active + return AnalysisRequestStatus.State.ACTIVE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java new file mode 100755 index 0000000..e8361fd --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java @@ -0,0 +1,82 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.OpenDiscoveryFramework; +import org.apache.atlas.odf.api.metadata.MetadataStore; +import org.apache.atlas.odf.api.metadata.importer.JDBCMetadataImporter; +import org.apache.atlas.odf.api.settings.SettingsManager; +import org.apache.wink.json4j.JSONException; + +import org.apache.atlas.odf.api.analysis.AnalysisManager; +import org.apache.atlas.odf.api.annotation.AnnotationStore; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager; +import org.apache.atlas.odf.api.engine.EngineManager; +import org.apache.atlas.odf.api.engine.ServiceRuntimesInfo; +import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes; +import org.apache.atlas.odf.json.JSONUtils; + +public class OpenDiscoveryFrameworkImpl implements OpenDiscoveryFramework { + + private Logger logger = Logger.getLogger(OpenDiscoveryFrameworkImpl.class.getName()); + + public OpenDiscoveryFrameworkImpl() { + if (!ODFInitializer.isRunning() && !ODFInitializer.isStartStopInProgress()) { + logger.log(Level.INFO, "Initializing Open Discovery Platform"); + ODFInitializer.start(); + getEngineManager().checkHealthStatus(); // This implicitly initializes the control center and the message queues + + logger.log(Level.INFO, "Open Discovery Platform successfully initialized."); + + // log active runtimes + ServiceRuntimesInfo activeRuntimesInfo = ServiceRuntimes.getRuntimesInfo(ServiceRuntimes.getActiveRuntimes()); + try { + logger.log(Level.INFO, "Active runtimes: ''{0}''", JSONUtils.toJSON(activeRuntimesInfo)); + } catch (JSONException e) { + logger.log(Level.WARNING, "Active runtime info has wrong format", e); + } + } + } + + public AnalysisManager getAnalysisManager() { + return new ODFInternalFactory().create(AnalysisManager.class); + } + + public DiscoveryServiceManager getDiscoveryServiceManager() { + return new ODFInternalFactory().create(DiscoveryServiceManager.class); + } + + public EngineManager getEngineManager() { + return new ODFInternalFactory().create(EngineManager.class); + } + + public SettingsManager getSettingsManager() { + return new ODFInternalFactory().create(SettingsManager.class); + } + + public AnnotationStore getAnnotationStore() { + return new ODFInternalFactory().create(AnnotationStore.class); + } + + public MetadataStore getMetadataStore() { + return new ODFInternalFactory().create(MetadataStore.class); + } + + public JDBCMetadataImporter getJDBCMetadataImporter() { + return new ODFInternalFactory().create(JDBCMetadataImporter.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java new file mode 100755 index 0000000..e58dd37 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java @@ -0,0 +1,71 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.atlas.odf.core.configuration.ConfigContainer; + +public class StandaloneEnvironment implements Environment { + + @Override + public String getProperty(String propertyName) { + return System.getProperty(propertyName); + } + + @Override + public String getCurrentUser() { + return System.getProperty("user.name"); + } + + @Override + public String getZookeeperConnectString() { + return getProperty("odf.zookeeper.connect"); + } + + @Override + public ConfigContainer getDefaultConfiguration() { + return Utils.readConfigurationFromClasspath("org/apache/atlas/odf/core/internal/odf-initial-configuration.json"); + } + + @Override + public Map<String, String> getPropertiesWithPrefix(String prefix) { + Map<String, String> foundProps = new HashMap<>(); + Properties props = System.getProperties(); + for (String key : props.stringPropertyNames()) { + if (key.startsWith(prefix)) { + foundProps.put(key, props.getProperty(key)); + } + } + return foundProps; + } + + @Override + public List<String> getActiveRuntimeNames() { + String p = getProperty("odf.active.runtimes"); + if (p == null || p.equals("ALL")) { + return null; + } + if (p.equals("NONE")) { + return new ArrayList<>(); + } + return Arrays.asList(p.split(",")); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java new file mode 100755 index 0000000..060f9fb --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java @@ -0,0 +1,314 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.StringTokenizer; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.settings.KafkaConsumerConfig; +import org.apache.atlas.odf.api.settings.ODFSettings; +import org.apache.atlas.odf.api.settings.SettingsManager; +import org.apache.atlas.odf.core.configuration.ConfigContainer; +import org.apache.atlas.odf.json.JSONUtils; +import org.apache.wink.json4j.JSONObject; + +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; + +public class Utils { + + static Logger logger = Logger.getLogger(Utils.class.getName()); + + private static final List<Class<? extends Object>> MERGABLE_CLASSES = Arrays.asList(ConfigContainer.class, KafkaConsumerConfig.class, ODFSettings.class, DiscoveryServiceProperties.class); + + public static void mergeODFPOJOs(Object source, Object update) { + if (!source.getClass().isAssignableFrom(update.getClass())) { + return; + } + + Method[] sourceMethods = source.getClass().getDeclaredMethods(); + + for (Method getterMethod : sourceMethods) { + if (getterMethod.getName().startsWith("get") || getterMethod.getName().startsWith("is")) { + String setterMethodName = getterMethod.getName().replaceFirst("get", "set"); + if (getterMethod.getName().startsWith("is")) { + setterMethodName = setterMethodName.replaceFirst("is", "set"); + } + try { + Method setterMethod = source.getClass().getDeclaredMethod(setterMethodName, getterMethod.getReturnType()); + Object updateValue = getterMethod.invoke(update); + if (updateValue != null) { + Object sourceValue = getterMethod.invoke(source); + + if (sourceValue != null && MERGABLE_CLASSES.contains(updateValue.getClass())) { + //Value is another POJO, must also try merging these instead of overwriting + mergeODFPOJOs(sourceValue, updateValue); + setterMethod.invoke(source, sourceValue); + } else if (sourceValue instanceof Map && updateValue instanceof Map) { + Map updateJSON = (Map) updateValue; + Map sourceJSON = (Map) sourceValue; + for (Object key : updateJSON.keySet()) { + sourceJSON.put(key, updateJSON.get(key)); + } + setterMethod.invoke(source, sourceJSON); + } else { + setterMethod.invoke(source, updateValue); + } + } + + } catch (NoSuchMethodException e) { + throw new RuntimeException(MessageFormat.format("Objects of type {0} and {1} could not be merged, no matching method found for {2}!", source.getClass().getName(), update + .getClass().getName(), getterMethod.getName()), e); + } catch (SecurityException e) { + throw new RuntimeException(MessageFormat.format("Objects of type {0} and {1} could not be merged, method {2} could not be accessed (SecurityException)!", source.getClass() + .getName(), update.getClass().getName(), setterMethodName), e); + } catch (IllegalAccessException e) { + throw new RuntimeException(MessageFormat.format("Objects of type {0} and {1} could not be merged, method {2} could not be accessed! (IllegalAccessException)", source.getClass() + .getName(), update.getClass().getName(), getterMethod.getName()), e); + } catch (IllegalArgumentException e) { + throw new RuntimeException(MessageFormat.format("Objects of type {0} and {1} could not be merged, method {2} does not accept the right parameters!", source.getClass().getName(), + update.getClass().getName(), setterMethodName), e); + } catch (InvocationTargetException e) { + e.printStackTrace(); + throw new RuntimeException(MessageFormat.format("Objects of type {0} and {1} could not be merged, method {2} or {3} could not be invoked!", source.getClass().getName(), update + .getClass().getName(), getterMethod.getName(), setterMethodName), e); + } + + } + } + } + + public static Properties readConfigProperties(String path) { + // TODO cache this in static variables, it doesn't change at runtime + InputStream is = Utils.class.getClassLoader().getResourceAsStream(path); + if (is == null) { + return null; + } + Properties props = new Properties(); + try { + props.load(is); + } catch (IOException e) { + throw new RuntimeException(e); + } + return props; + } + + public static void setCurrentTimeAsLastModified(AnalysisRequestTracker tracker) { + tracker.setLastModified(System.currentTimeMillis()); + } + + public static String getExceptionAsString(Throwable exc) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + exc.printStackTrace(pw); + String st = sw.toString(); + return st; + } + + public static String collectionToString(Collection<?> coll, String separator) { + StringBuffer buf = null; + for (Object o : coll) { + if (buf == null) { + buf = new StringBuffer("[ "); + } else { + buf.append(separator); + } + buf.append(o.toString()); + } + buf.append(" ]"); + return buf.toString(); + } + + public static <T> boolean containsOnly(List<T> l, T[] elements) { + for (T t : l) { + boolean containsOnlyElements = false; + for (T el : elements) { + if (t.equals(el)) { + containsOnlyElements = true; + break; + } + } + if (!containsOnlyElements) { + return false; + } + } + return true; + } + + public static <T> boolean containsNone(List<T> l, T[] elements) { + for (T t : l) { + boolean containsAnyElement = false; + for (T el : elements) { + if (t.equals(el)) { + containsAnyElement = true; + break; + } + } + if (containsAnyElement) { + return true; + } + } + return false; + } + + public static List<String> splitString(String s, char separator) { + List<String> l = new ArrayList<String>(); + if (s != null) { + StringTokenizer tok = new StringTokenizer(s, String.valueOf(separator)); + while (tok.hasMoreTokens()) { + l.add(tok.nextToken()); + } + } + return l; + } + + public static String getInputStreamAsString(InputStream is, String encoding) { + try { + final int n = 2048; + byte[] b = new byte[0]; + byte[] temp = new byte[n]; + int bytesRead; + while ((bytesRead = is.read(temp)) != -1) { + byte[] newB = new byte[b.length + bytesRead]; + System.arraycopy(b, 0, newB, 0, b.length); + System.arraycopy(temp, 0, newB, b.length, bytesRead); + b = newB; + } + String s = new String(b, encoding); + return s; + } catch (IOException exc) { + return getExceptionAsString(exc); + } + } + + public static void mergeJSONObjects(JSONObject source, JSONObject target) { + if (source != null && target != null) { + target.putAll(source); + } + } + + public static <T> T getValue(T value, T defaultValue) { + if (value == null) { + return defaultValue; + } + return value; + } + + public static String getSystemPropertyExceptionIfMissing(String propertyName) { + Environment env = new ODFInternalFactory().create(Environment.class); + String value = env.getProperty(propertyName); + if (value == null) { + String msg = MessageFormat.format("System property ''{0}'' is not set", propertyName); + logger.log(Level.SEVERE, msg); + throw new RuntimeException(msg); + } + return value; + } + + public static int getIntEnvironmentProperty(String propertyName, int defaultValue) { + Environment env = new ODFInternalFactory().create(Environment.class); + String value = env.getProperty(propertyName); + if (value == null) { + return defaultValue; + } + try { + return Integer.parseInt(value); + } catch(NumberFormatException exc) { + return defaultValue; + } + } + + + public static void runSystemCommand(String command) { + logger.log(Level.INFO, "Running system command: " + command); + try { + Runtime r = Runtime.getRuntime(); + Process p = r.exec(command); + p.waitFor(); + BufferedReader b = new BufferedReader(new InputStreamReader(p.getInputStream())); + String line = ""; + while ((line = b.readLine()) != null) { + logger.log(Level.INFO, "System command out: " + line); + } + b.close(); + } catch(IOException | InterruptedException e) { + logger.log(Level.INFO, "Error executing system command.", e); + } + } + + public static ConfigContainer readConfigurationFromClasspath(String jsonFileInClasspath) { + InputStream is = SettingsManager.class.getClassLoader().getResourceAsStream(jsonFileInClasspath); + try { + JSONObject configJSON = new JSONObject(is); + ConfigContainer config = JSONUtils.fromJSON(configJSON.write(), ConfigContainer.class); + return config; + } catch (Exception exc) { + throw new RuntimeException(exc); + } + } + + public static String joinStrings(List<String> l, char separator) { + String result = null; + if ((l != null) && !l.isEmpty()) { + StringBuilder buf = null; + for (String s : l) { + if (buf == null) { + buf = new StringBuilder(); + } else { + buf.append(separator); + } + buf.append(s); + } + result = buf.toString(); + } + return result; + } + + public static String getEnvironmentProperty(String name, String defaultValue) { + Environment env = new ODFInternalFactory().create(Environment.class); + String s = env.getProperty(name); + return s != null ? s : defaultValue; + } + + public static long getEnvironmentProperty(String name, long defaultValue) { + Environment env = new ODFInternalFactory().create(Environment.class); + String s = env.getProperty(name); + if (s == null) { + return defaultValue; + } + try { + return Long.parseLong(s); + } catch(NumberFormatException exc) { + String msg = MessageFormat.format("Property ''{0}'' could not be converted to an integer", new Object[]{name}); + logger.log(Level.WARNING, msg); + return defaultValue; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java new file mode 100755 index 0000000..8f7fab2 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java @@ -0,0 +1,177 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.analysis; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.analysis.AnalysisCancelResult; +import org.apache.atlas.odf.api.analysis.AnalysisManager; +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus; +import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State; +import org.apache.atlas.odf.api.analysis.AnalysisRequestSummary; +import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackers; +import org.apache.atlas.odf.api.analysis.AnalysisResponse; +import org.apache.atlas.odf.api.metadata.MetaDataObjectReference; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.ODFUtils; +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore; +import org.apache.atlas.odf.core.controlcenter.ControlCenter; +import org.apache.atlas.odf.json.JSONUtils; + +/** + * + * External Java API for creating and managing analysis requests + * + */ +public class AnalysisManagerImpl implements AnalysisManager { + + public final static char COMPOUND_REQUEST_SEPARATOR = ','; + private Logger logger = Logger.getLogger(AnalysisManagerImpl.class.getName()); + private ControlCenter controlCenter; + + public AnalysisManagerImpl() { + controlCenter = new ODFInternalFactory().create(ControlCenter.class); + } + + /** + * Issues a new ODF analysis request + * + * @param request Analysis request + * @return Response containing the request id and status information + */ + public AnalysisResponse runAnalysis(AnalysisRequest request) { + if (((request.getDiscoveryServiceSequence() == null) || request.getDiscoveryServiceSequence().isEmpty()) + && ((request.getAnnotationTypes() == null) || request.getAnnotationTypes().isEmpty())) { + AnalysisResponse response = new AnalysisResponse(); + response.setId(request.getId()); + response.setDetails("Either a sequence of discovery service ids or a list of annotation types must be specified to initiate an analysis request."); + response.setInvalidRequest(true); + return response; + } + + if ((request.getDataSets().size() == 1) || request.isProcessDataSetsSequentially()) { + logger.log(Level.INFO, "Using sequential request processing (maybe because there is only a single data set)"); + AnalysisResponse response = controlCenter.startRequest(request); + logger.log(Level.INFO, "Request with ID ''{0}'' started on data sets ''{1}''. Complete request: {2}.", + new Object[] { response.getId(), request.getDataSets(), JSONUtils.lazyJSONSerializer(request) }); + return response; + } + + List<String> requestIDs = new ArrayList<String>(); + List<String> detailsMessages = new ArrayList<String>(); + boolean invalidRequest = true; + logger.log(Level.INFO, "Running requests for ''{0}'' data sets in parallel", request.getDataSets().size()); + logger.log(Level.FINE, "Splitting request into multiple request for each data set. Data Sets: {0}", request.getDataSets()); + for (MetaDataObjectReference dataSet : request.getDataSets()) { + AnalysisRequest partRequest = new AnalysisRequest(); + partRequest.setDiscoveryServiceSequence(request.getDiscoveryServiceSequence()); + partRequest.setAdditionalProperties(request.getAdditionalProperties()); + partRequest.setDataSets(Collections.singletonList(dataSet)); + AnalysisResponse partResponse = controlCenter.startRequest(partRequest); + if (!partResponse.isInvalidRequest()) { + String partRequestID = partResponse.getId(); + requestIDs.add(partRequestID); + detailsMessages.add(partResponse.getDetails()); + // as soon as one request is valid, we make the compound request valid + invalidRequest = false; + } + } + AnalysisResponse response = new AnalysisResponse(); + response.setId(Utils.joinStrings(requestIDs, COMPOUND_REQUEST_SEPARATOR)); + response.setDetails(Utils.joinStrings(detailsMessages, COMPOUND_REQUEST_SEPARATOR)); + response.setInvalidRequest(invalidRequest); + return response; + } + + /** + * Retrieve status of an ODF analysis request + * + * @param requestId Unique id of the analysis request + * @return Status of the analysis request + */ + public AnalysisRequestStatus getAnalysisRequestStatus(String requestId) { + List<String> singleRequestIds = Utils.splitString(requestId, COMPOUND_REQUEST_SEPARATOR); + if (singleRequestIds.size() == 1) { + AnalysisRequestStatus status = controlCenter.getRequestStatus(requestId); + return status; + } + AnalysisRequestStatus compoundStatus = new AnalysisRequestStatus(); + compoundStatus.setState(State.QUEUED); + AnalysisRequest compoundRequest = new AnalysisRequest(); // assemble a compound request + compoundRequest.setId(requestId); + List<String> allMessages = new ArrayList<String>(); + List<MetaDataObjectReference> allDataSets = new ArrayList<>(); + List<State> allStates = new ArrayList<>(); + for (String singleRequestId : singleRequestIds) { + AnalysisRequestStatus singleStatus = controlCenter.getRequestStatus(singleRequestId); + if (compoundRequest.getDiscoveryServiceSequence() == null) { + // assume all fields of the single requests are the same + // since they were created through runAnalysis() + compoundRequest.setDiscoveryServiceSequence(singleStatus.getRequest().getDiscoveryServiceSequence()); + compoundRequest.setAdditionalProperties(singleStatus.getRequest().getAdditionalProperties()); + } + if (singleStatus.getRequest().getDataSets() != null) { + allDataSets.addAll(singleStatus.getRequest().getDataSets()); + } + allStates.add(singleStatus.getState()); + allMessages.add(singleStatus.getDetails()); + } + compoundRequest.setDataSets(allDataSets); + + compoundStatus.setState(ODFUtils.combineStates(allStates)); + compoundStatus.setRequest(compoundRequest); + compoundStatus.setDetails(Utils.joinStrings(allMessages, COMPOUND_REQUEST_SEPARATOR)); + return compoundStatus; + } + + /** + * Retrieve statistics about all previous ODF analysis requests + * + * @return Request summary + */ + public AnalysisRequestSummary getAnalysisStats() { + AnalysisRequestTrackerStore store = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class); + return store.getRequestSummary(); + } + + /** + * Retrieve status details of recent ODF analysis requests + * + * @param offset Starting offset (use 0 to start with the latest request) + * @param limit Maximum number of analysis requests to be returned (use -1 to retrieve all requests) + * @return Status details for each discovery request + */ + public AnalysisRequestTrackers getAnalysisRequests(int offset, int limit) { + AnalysisRequestTrackerStore store = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class); + AnalysisRequestTrackers analysisrequestTrackers = new AnalysisRequestTrackers(); + analysisrequestTrackers.setAnalysisRequestTrackers(store.getRecentTrackers(offset, limit)); + return analysisrequestTrackers; + } + + /** + * Request a specific ODF discovery request to be canceled + * + * @param requestId Unique id of the analysis request + * @return Status of the cancellation attempt + */ + public AnalysisCancelResult cancelAnalysisRequest(String requestId) { + return controlCenter.cancelRequest(requestId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java new file mode 100755 index 0000000..798b2d3 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java @@ -0,0 +1,48 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.annotation; + +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.metadata.models.Annotation; +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.annotation.AnnotationStore; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResult; + +public class InternalAnnotationStoreUtils { + + public static void storeDiscoveryServiceResult(DiscoveryServiceResult result, AnalysisRequest req) { + Logger logger = Logger.getLogger(InternalAnnotationStoreUtils.class.getName()); + AnnotationStore mds = new ODFFactory().create().getAnnotationStore(); + mds.setAnalysisRun(req.getId()); + if (result != null) { + logger.log(Level.FINE, "Persisting annotations returned by discovery service"); + List<Annotation> annotations = result.getAnnotations(); + if (annotations != null) { + for (Annotation annot : annotations) { + // only persist if reference was not set + if (annot.getReference() == null) { + mds.store(annot); + } else { + logger.log(Level.WARNING, "Returned annotation object has a non-null reference set and will not be persisted (reference: {0})", annot.getReference().toString()); + } + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java new file mode 100755 index 0000000..f779155 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java @@ -0,0 +1,68 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.configuration; + + +import java.util.List; + +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.settings.ODFSettings; +import org.apache.atlas.odf.api.settings.validation.ValidationException; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +/** + * { + * "odf" : {...}, + * "userDefined" : {...} + * } + * + * + * This class is final, because reflection is used to access getters / setters in order to merge. This doesn't work with inherited methods + */ +@ApiModel(description="All ODF configuration options.") +public final class ConfigContainer { + + @ApiModelProperty(value="General ODF configuration options along with details about available discovery services", required=true) + private ODFSettings odf; + + @ApiModelProperty(value="Details about available discovery services") + private List<DiscoveryServiceProperties> registeredServices = null; + + public List<DiscoveryServiceProperties> getRegisteredServices() { + return registeredServices; + } + + public void setRegisteredServices(List<DiscoveryServiceProperties> registeredServices) { + this.registeredServices = registeredServices; + } + + public ODFSettings getOdf() { + return odf; + } + + public void setOdf(ODFSettings odfSettings) { + this.odf = odfSettings; + } + + public void validate() throws ValidationException { + if (this.odf != null) { + odf.validate(); + } + if (this.registeredServices != null) { + new ServiceValidator().validate("ODFConfig.registeredServices", this.registeredServices); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java new file mode 100755 index 0000000..7ad90e6 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java @@ -0,0 +1,235 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.configuration; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.List; +import java.util.UUID; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.metadata.MetadataStore; +import org.apache.atlas.odf.api.settings.SparkConfig; +import org.apache.atlas.odf.api.settings.validation.ValidationException; +import org.apache.atlas.odf.core.Encryption; +import org.apache.atlas.odf.core.Environment; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.json.JSONUtils; +import org.apache.wink.json4j.JSONException; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.core.controlcenter.ControlCenter; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.core.store.ODFConfigurationStorage; + +public class ConfigManager { + private Logger logger = Logger.getLogger(ConfigManager.class.getName()); + public static final String HIDDEN_PASSWORD_IDENTIFIER = "***hidden***"; + public static final long CONFIG_UPDATE_SLEEP_BETWEEN_POLLS = 20; + public static final int CONFIG_UPDATE_MAX_POLLS = 1500; + private static final String DEFAULT_ENCRYPTED_SPARK_CONFIGS = "spark.authenticate.secret,spark.ssl.keyPassword,spark.ssl.keyStorePassword,spark.ssl.trustStorePassword"; + + protected ODFConfigurationStorage configurationStore; + protected ODFConfigNotificationPublisher notificationManager; + + public ConfigManager() { + ODFInternalFactory f = new ODFInternalFactory(); + this.configurationStore = f.create(ODFConfigurationStorage.class); + this.notificationManager = f.create(ODFConfigNotificationPublisher.class); + } + + public ConfigContainer getConfigContainer() { + ConfigContainer config = configurationStore.getConfig(getDefaultConfigContainer()); + return config; + } + + public ConfigContainer getConfigContainerHidePasswords() { + ConfigContainer config = configurationStore.getConfig(getDefaultConfigContainer()); + hidePasswords(config); + return config; + } + + public void updateConfigContainer(ConfigContainer update) throws ValidationException { + try { + update = JSONUtils.cloneJSONObject(update); + } catch (JSONException e) { + throw new RuntimeException(e); + } + update.validate(); + ConfigContainer source = getConfigContainer(); + unhideAndEncryptPasswords(update, source); + + List<DiscoveryServiceProperties> newServicesToRun = new ArrayList<DiscoveryServiceProperties>(); + if (update.getRegisteredServices() != null + && source.getRegisteredServices().size() < update.getRegisteredServices().size()) { + // store added services if update registers new ones + List<DiscoveryServiceProperties> newRegisteredServices = new ArrayList<DiscoveryServiceProperties>(); + newRegisteredServices.addAll(update.getRegisteredServices()); + for (DiscoveryServiceProperties oldService : source.getRegisteredServices()) { + for (int no = 0; no < newRegisteredServices.size(); no++) { + if (newRegisteredServices.get(no).getId().equals(oldService.getId())) { + newRegisteredServices.remove(no); + break; + } + } + } + + newServicesToRun.addAll(newRegisteredServices); + } + + Utils.mergeODFPOJOs(source, update); + configurationStore.storeConfig(source); + + if (source.getOdf().getRunNewServicesOnRegistration() && !newServicesToRun.isEmpty()) { + runNewServices(newServicesToRun); + } + + String changeId = UUID.randomUUID().toString(); + configurationStore.addPendingConfigChange(changeId); + this.notificationManager.publishConfigChange(source, changeId); + for (int i=0; i < CONFIG_UPDATE_MAX_POLLS; i++) { + if (!configurationStore.isConfigChangePending(changeId)) { + logger.log(Level.INFO, MessageFormat.format("Config change id ''{0}'' successfully completed after {1} msec.", new Object[] { changeId, i * CONFIG_UPDATE_SLEEP_BETWEEN_POLLS } )); + return; + } + try { + Thread.sleep(CONFIG_UPDATE_SLEEP_BETWEEN_POLLS); + } catch (InterruptedException e) { + // Ignore interrupt + logger.log(Level.WARNING, "Sleep period was interrupted", e); + } + } + logger.log(Level.WARNING, MessageFormat.format("Config change did not complete after {0} msec.", CONFIG_UPDATE_SLEEP_BETWEEN_POLLS * CONFIG_UPDATE_MAX_POLLS)); + } + + public void resetConfigContainer() { + logger.warning("resetting ODF configuration!"); + configurationStore.storeConfig(getDefaultConfigContainer()); + } + + private static String defaultConfig = null; + + List<DiscoveryServiceProperties> getServicesFoundOnClassPath() throws IOException, JSONException { + ClassLoader cl = this.getClass().getClassLoader(); + Enumeration<URL> services = cl.getResources("META-INF/odf/odf-services.json"); + List<DiscoveryServiceProperties> result = new ArrayList<>(); + while (services.hasMoreElements()) { + URL url = services.nextElement(); + InputStream is = url.openStream(); + String json = Utils.getInputStreamAsString(is, "UTF-8"); + logger.log(Level.INFO, "Service found on the classpath at {0}: {1}", new Object[] { url, json }); + result.addAll(JSONUtils.fromJSONList(json, DiscoveryServiceProperties.class)); + } + logger.log(Level.INFO, "Number of classpath services found: {0}", result.size()); + return result; + } + + private ConfigContainer getDefaultConfigContainer() { + if (defaultConfig == null) { + try { + ConfigContainer config = new ODFInternalFactory().create(Environment.class).getDefaultConfiguration(); + // now look for services found on the classpath + config.getRegisteredServices().addAll(getServicesFoundOnClassPath()); + defaultConfig = JSONUtils.toJSON(config); + } catch (IOException | JSONException e) { + String msg = "Default config could not be loaded or parsed!"; + logger.severe(msg); + throw new RuntimeException(msg, e); + } + } + try { + return JSONUtils.fromJSON(defaultConfig, ConfigContainer.class); + } catch (JSONException e) { + throw new RuntimeException(e); + } + } + + private void runNewServices(List<DiscoveryServiceProperties> newServices) { + ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class); + List<String> servicesToRun = new ArrayList<String>(); + for (DiscoveryServiceProperties info : newServices) { + servicesToRun.add(info.getId()); + } + + AnalysisRequest req = new AnalysisRequest(); + MetadataStore mds = new ODFFactory().create().getMetadataStore(); + req.setDiscoveryServiceSequence(servicesToRun); + req.setDataSets(mds.search(mds.newQueryBuilder().objectType("DataSet").build())); + req.setIgnoreDataSetCheck(true); + cc.startRequest(req); + } + + private void unhideAndEncryptPasswords(ConfigContainer updatedConfigContainer, + ConfigContainer originalConfiguration) { + if (updatedConfigContainer.getOdf() != null) { + String odfPassword = updatedConfigContainer.getOdf().getOdfPassword(); + if (odfPassword != null) { + if (odfPassword.equals(HIDDEN_PASSWORD_IDENTIFIER)) { + // Password was not changed, therefore keep original + // encrypted password + updatedConfigContainer.getOdf().setOdfPassword(originalConfiguration.getOdf().getOdfPassword()); + } else if (!Encryption.isEncrypted(odfPassword)) { + updatedConfigContainer.getOdf().setOdfPassword(Encryption.encryptText(odfPassword)); + } + } + if (updatedConfigContainer.getOdf().getSparkConfig() != null) { + SparkConfig updatedSparkConfig = updatedConfigContainer.getOdf().getSparkConfig(); + if (updatedSparkConfig.getConfigs() != null) { + List<String> encryptedSparkConfigs = Arrays.asList(DEFAULT_ENCRYPTED_SPARK_CONFIGS.split(",")); + for (String configName : updatedSparkConfig.getConfigs().keySet()) { + if (encryptedSparkConfigs.contains(configName)) { + String updatedConfigValue = (String) updatedSparkConfig.getConfigs().get(configName); + if (updatedConfigValue.equals(HIDDEN_PASSWORD_IDENTIFIER)) { + // Encrypted value was not changed, therefore keep original + // Encrypted value + SparkConfig originalSparkConfig = originalConfiguration.getOdf().getSparkConfig(); + updatedSparkConfig.setConfig(configName, originalSparkConfig.getConfigs().get(configName)); + } else if (!Encryption.isEncrypted(updatedConfigValue)) { + updatedSparkConfig.setConfig(configName, Encryption.encryptText(updatedConfigValue)); + } + } + } + } + } + } + } + + private void hidePasswords(ConfigContainer configContainer) { + if (configContainer.getOdf() != null) { + if (configContainer.getOdf().getOdfPassword() != null) { + configContainer.getOdf().setOdfPassword(HIDDEN_PASSWORD_IDENTIFIER); + } + if ((configContainer.getOdf().getSparkConfig() != null)){ + SparkConfig sparkConfig = configContainer.getOdf().getSparkConfig(); + if (sparkConfig.getConfigs() != null) { + List<String> encryptedSparkConfigs = Arrays.asList(DEFAULT_ENCRYPTED_SPARK_CONFIGS.split(",")); + for (String configName : sparkConfig.getConfigs().keySet()) { + if (((encryptedSparkConfigs.contains(configName)) && (sparkConfig.getConfigs().get(configName)) != null)) { + sparkConfig.setConfig(configName, HIDDEN_PASSWORD_IDENTIFIER); + } + } + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java new file mode 100755 index 0000000..a7f822f --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java @@ -0,0 +1,45 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.configuration; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.controlcenter.AdminMessage; +import org.apache.atlas.odf.core.controlcenter.AdminMessage.Type; +import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager; +import org.apache.atlas.odf.json.JSONUtils; + +public class ODFConfigNotificationPublisher { + + Logger logger = Logger.getLogger(ODFConfigNotificationPublisher.class.getName()); + + public void publishConfigChange(ConfigContainer update, String changeId) { + try { + logger.log(Level.FINE, "publishing config change: {0}", JSONUtils.toJSON(update)); + ConfigContainer clone = JSONUtils.fromJSON(JSONUtils.toJSON(update), ConfigContainer.class); + AdminMessage amsg = new AdminMessage(); + amsg.setId(changeId); + amsg.setAdminMessageType(Type.CONFIGCHANGE); + amsg.setConfigUpdateDetails(clone); + amsg.setDetails("Configuration update"); + DiscoveryServiceQueueManager qm = new ODFInternalFactory().create(DiscoveryServiceQueueManager.class); + qm.enqueueInAdminQueue(amsg); + } catch (Exception exc) { + logger.log(Level.WARNING, "An unexpected exception occurres when writing to admin queue. Ignoring it", exc); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java new file mode 100755 index 0000000..011d728 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java @@ -0,0 +1,75 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.configuration; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.List; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.settings.validation.PropertyValidator; +import org.apache.atlas.odf.api.settings.validation.ValidationException; +import org.apache.atlas.odf.core.controlcenter.ServiceRuntime; +import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes; + +public class ServiceValidator implements PropertyValidator { + + public void validate(String property, Object value) throws ValidationException { + validate(property, value, true); + } + + private void validate(String property, Object value, boolean topLevel) throws ValidationException { + if (value == null) { + throw new ValidationException("Null values are not allowed for this property"); + } + + if (value instanceof List) { + List<DiscoveryServiceProperties> newServices = (List<DiscoveryServiceProperties>) value; + List<String> ids = new ArrayList<String>(); + for (int no = 0; no < newServices.size(); no++) { + DiscoveryServiceProperties service = (DiscoveryServiceProperties) newServices.get(no); + validate(property, service, false); + String serviceId = service.getId(); + if (ids.contains(serviceId)) { + throw new ValidationException(property, MessageFormat.format("you cannot register multiple services with the same id {0}!", serviceId)); + } else { + ids.add(serviceId); + } + } + } else if (value instanceof DiscoveryServiceProperties) { + DiscoveryServiceProperties service = (DiscoveryServiceProperties) value; + if (service.getId() == null || service.getId().trim().isEmpty() || service.getName() == null || service.getName().trim().isEmpty() || service.getEndpoint() == null) { + throw new ValidationException(property, MessageFormat.format("A service requires {0}", "id, name and an endpoint")); + } + + if (topLevel) { + List<String> regServices = new ArrayList<String>(); + List<DiscoveryServiceProperties> services = new ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties(); + for (DiscoveryServiceProperties regService : services) { + regServices.add(regService.getId()); + } + + if (regServices.contains(service.getId())) { + throw new ValidationException(property, MessageFormat.format("a service with id {0} already exists!", service.getId())); + } + } + + ServiceRuntime runtime = ServiceRuntimes.getRuntimeForDiscoveryService(service); + runtime.validate(service); + } else { + throw new ValidationException(property, "only DiscoveryServiceRegistrationInfo objects or list of such objects are allowed for this property"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java new file mode 100755 index 0000000..fffff6f --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java @@ -0,0 +1,60 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import org.apache.atlas.odf.core.configuration.ConfigContainer; + +// JSON +public class AdminMessage { + public static enum Type { + SHUTDOWN, RESTART, CONFIGCHANGE + } + + private Type adminMessageType; + private String details; + private ConfigContainer configUpdateDetails; + private String messageId; + + public Type getAdminMessageType() { + return adminMessageType; + } + + public void setAdminMessageType(Type adminMessageType) { + this.adminMessageType = adminMessageType; + } + + public String getDetails() { + return details; + } + + public void setDetails(String details) { + this.details = details; + } + + public ConfigContainer getConfigUpdateDetails() { + return configUpdateDetails; + } + + public void setConfigUpdateDetails(ConfigContainer configUpdateDetails) { + this.configUpdateDetails = configUpdateDetails; + } + + public String getId() { + return this.messageId; + } + + public void setId(String messageId) { + this.messageId = messageId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java new file mode 100755 index 0000000..874e061 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java @@ -0,0 +1,92 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.concurrent.ExecutorService; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.wink.json4j.JSONException; + +import org.apache.atlas.odf.core.ODFInitializer; +import org.apache.atlas.odf.json.JSONUtils; + +public class AdminQueueProcessor implements QueueMessageProcessor { + + private Logger logger = Logger.getLogger(AdminQueueProcessor.class.getName()); + + @Override + public void process(ExecutorService executorService, String msg, int partition, long offset) { + AdminMessage adminMessage; + try { + adminMessage = JSONUtils.fromJSON(msg, AdminMessage.class); + } catch (JSONException e) { + throw new RuntimeException(e); + } + switch (adminMessage.getAdminMessageType()) { + case SHUTDOWN: + initiateShutdown(executorService, false); + break; + case RESTART: + initiateShutdown(executorService, true); + break; + default: + // do nothing + } + } + + static Object restartLockObject = new Object(); + + private void initiateShutdown(ExecutorService executorService, final boolean restart) { + logger.log(Level.INFO, "Shutdown of ODF was requested..."); + Runnable shutDownRunnable = new Runnable() { + + @Override + public void run() { + logger.log(Level.INFO, "Initiating shutdown"); + + // sleep some time before initiating the actual shutdown to give the process() a chance to return + // before it is itself shut down + long sleepTimeBeforeShutdown = 1000; + try { + Thread.sleep(sleepTimeBeforeShutdown); + } catch (InterruptedException e) { + // do nothing + e.printStackTrace(); + } + + synchronized (restartLockObject) { + logger.log(Level.INFO, "Shutting down ODF..."); + try { + ODFInitializer.stop(); + logger.log(Level.INFO, "ODF was shutdown"); + + if (restart) { + logger.log(Level.INFO, "Restarting ODF"); + ODFInitializer.start(); + logger.log(Level.INFO, "ODF restarted"); + } + } catch (Exception e) { + logger.log(Level.SEVERE, "An unexpected error occurred when shutting down ODF", e); + } + } + + } + + }; + + executorService.submit(shutDownRunnable); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java new file mode 100755 index 0000000..e43bd45 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java @@ -0,0 +1,53 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.List; + +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.analysis.AnalysisRequestSummary; +import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS; +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; + + +public interface AnalysisRequestTrackerStore { + + /** + * set the status of old requests which were last modified before the cutOffTimestamp + * with an optional detailsMessage + */ + void setStatusOfOldRequest(long cutOffTimestamp, STATUS status, String detailsMessage); + + // store / update the passed tracker + void store(AnalysisRequestTracker tracker); + + AnalysisRequestTracker query(String analysisRequestId); + + AnalysisRequestTracker findSimilarQueuedRequest(AnalysisRequest request); + + /** + * @param number - number of trackers to retrieve, -1 for all + * @return + */ + List<AnalysisRequestTracker> getRecentTrackers(int offset, int limit); + + /** + * Clear any internal caches, if any. + */ + void clearCache(); + + int getSize(); + + AnalysisRequestSummary getRequestSummary(); +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java new file mode 100755 index 0000000..8100f18 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java @@ -0,0 +1,108 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.concurrent.ExecutorService; + +import org.apache.atlas.odf.api.annotation.AnnotationStore; +import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse.ResponseCode; +import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService; +import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncRunStatus; +import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse; +import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer; +import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse; +import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService; +import org.apache.atlas.odf.api.metadata.MetadataStore; +import org.apache.atlas.odf.core.Utils; + +public class AsyncDiscoveryServiceWrapper implements SyncDiscoveryService { + + AsyncDiscoveryService wrappedService = null; + + public AsyncDiscoveryServiceWrapper(AsyncDiscoveryService wrappedService) { + this.wrappedService = wrappedService; + } + + @Override + public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) { + try { + DiscoveryServiceAsyncStartResponse asyncResponse = wrappedService.startAnalysis(request); + ResponseCode code = asyncResponse.getCode(); + if (code != ResponseCode.OK) { + DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse(); + response.setCode(code); + response.setDetails(asyncResponse.getDetails()); + return response; + } + // poll the async service + final long maxWaitTimeSecs = Utils.getEnvironmentProperty("odf.async.max.wait.secs", 10 * 60); // default: 10 minutes + final long pollingIntervalMS = Utils.getEnvironmentProperty("odf.async.poll.interval.ms", 1000); + long maxPolls = (maxWaitTimeSecs * 1000) / pollingIntervalMS; + int pollCounter = 0; + + DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse(); + String runId = asyncResponse.getRunId(); + while (pollCounter < maxPolls) { + Thread.sleep(pollingIntervalMS); + DiscoveryServiceAsyncRunStatus status = wrappedService.getStatus(runId); + switch (status.getState()) { + case NOT_FOUND: + // should not happen + response.setCode(ResponseCode.UNKNOWN_ERROR); + response.setDetails("Run ID " + runId + " was not found. This should not have happened."); + return response; + case ERROR: + response.setCode(ResponseCode.UNKNOWN_ERROR); + response.setDetails(status.getDetails()); + return response; + case FINISHED: + response.setCode(ResponseCode.OK); + response.setDetails(status.getDetails()); + response.setResult(status.getResult()); + return response; + default: + // continue polling + pollCounter++; + } + } + response.setCode(ResponseCode.UNKNOWN_ERROR); + response.setDetails("Polled Async service for " + maxWaitTimeSecs + " seconds without positive result"); + return response; + } catch (Exception exc) { + DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse(); + response.setCode(ResponseCode.UNKNOWN_ERROR); + response.setDetails("An unknown error occurred: " + Utils.getExceptionAsString(exc)); + return response; + } + } + + public void setExecutorService(ExecutorService executorService) { + wrappedService.setExecutorService(executorService); + } + + public void setMetadataStore(MetadataStore metadataStore) { + wrappedService.setMetadataStore(metadataStore); + } + + public void setAnnotationStore(AnnotationStore annotationStore) { + wrappedService.setAnnotationStore(annotationStore); + } + + public DataSetCheckResult checkDataSet(DataSetContainer dataSetContainer) { + return wrappedService.checkDataSet(dataSetContainer); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java new file mode 100755 index 0000000..bcd2965 --- /dev/null +++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java @@ -0,0 +1,45 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.controlcenter; + +import java.util.concurrent.ExecutorService; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.controlcenter.AdminMessage.Type; +import org.apache.atlas.odf.core.store.ODFConfigurationStorage; +import org.apache.atlas.odf.json.JSONUtils; + +public class ConfigChangeQueueProcessor implements QueueMessageProcessor { + + Logger logger = Logger.getLogger(ConfigChangeQueueProcessor.class.getName()); + + @Override + public void process(ExecutorService executorService, String msg, int partition, long offset) { + try { + AdminMessage amsg = JSONUtils.fromJSON(msg, AdminMessage.class); + if (Type.CONFIGCHANGE.equals(amsg.getAdminMessageType())) { + logger.info("Received config change: " + JSONUtils.toJSON(amsg)); + ODFInternalFactory f = new ODFInternalFactory(); + ODFConfigurationStorage configStorage = f.create(ODFConfigurationStorage.class); + configStorage.onConfigChange(amsg.getConfigUpdateDetails()); + configStorage.removePendingConfigChange(amsg.getId()); + } + } catch(Exception exc) { + logger.log(Level.WARNING, "An exception occurred while processing admin message", exc); + } + } + +}
