Repository: knox Updated Branches: refs/heads/master a970502aa -> 2adc917e4
KNOX-402: New GatewayService - TopologyService Project: http://git-wip-us.apache.org/repos/asf/knox/repo Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/2adc917e Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/2adc917e Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/2adc917e Branch: refs/heads/master Commit: 2adc917e486860cf0d009bd50786d5c5bb87c374 Parents: a970502 Author: Kevin Minder <[email protected]> Authored: Fri Jul 18 09:23:54 2014 -0400 Committer: Kevin Minder <[email protected]> Committed: Fri Jul 18 09:23:54 2014 -0400 ---------------------------------------------------------------------- .../apache/hadoop/gateway/GatewayServer.java | 81 ++--- .../gateway/services/CLIGatewayServices.java | 11 + .../services/DefaultGatewayServices.java | 11 + .../topology/impl/DefaultTopologyService.java | 321 +++++++++++++++++++ .../topology/file/FileTopologyProvider.java | 249 -------------- .../org/apache/hadoop/gateway/util/KnoxCLI.java | 12 +- .../topology/DefaultTopologyServiceTest.java | 208 ++++++++++++ .../topology/file/FileTopologyProviderTest.java | 205 ------------ .../gateway/services/GatewayServices.java | 3 +- .../services/topology/TopologyService.java | 41 +++ .../hadoop/gateway/GatewayDeployFuncTest.java | 4 +- 11 files changed, 625 insertions(+), 521 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java index 9fc0dc2..92faa17 100644 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java +++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java @@ -34,12 +34,12 @@ import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; import org.apache.hadoop.gateway.i18n.resources.ResourcesFactory; import org.apache.hadoop.gateway.services.GatewayServices; import org.apache.hadoop.gateway.services.ServiceLifecycleException; +import org.apache.hadoop.gateway.services.topology.TopologyService; import org.apache.hadoop.gateway.services.registry.ServiceRegistry; import org.apache.hadoop.gateway.services.security.SSLService; import org.apache.hadoop.gateway.topology.Topology; import org.apache.hadoop.gateway.topology.TopologyEvent; import org.apache.hadoop.gateway.topology.TopologyListener; -import org.apache.hadoop.gateway.topology.file.FileTopologyProvider; import org.apache.log4j.PropertyConfigurator; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; @@ -48,7 +48,6 @@ import org.eclipse.jetty.server.handler.ErrorHandler; import org.eclipse.jetty.webapp.WebAppContext; import org.jboss.shrinkwrap.api.exporter.ExplodedExporter; import org.jboss.shrinkwrap.api.spec.WebArchive; -import org.xml.sax.SAXException; import java.io.File; import java.io.FileOutputStream; @@ -68,20 +67,20 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; public class GatewayServer { - private static GatewayResources res = ResourcesFactory.get( GatewayResources.class ); - private static GatewayMessages log = MessagesFactory.get( GatewayMessages.class ); - private static Auditor auditor = AuditServiceFactory.getAuditService().getAuditor( AuditConstants.DEFAULT_AUDITOR_NAME, - AuditConstants.KNOX_SERVICE_NAME, AuditConstants.KNOX_COMPONENT_NAME ); + private static GatewayResources res = ResourcesFactory.get(GatewayResources.class); + private static GatewayMessages log = MessagesFactory.get(GatewayMessages.class); + private static Auditor auditor = AuditServiceFactory.getAuditService().getAuditor(AuditConstants.DEFAULT_AUDITOR_NAME, + AuditConstants.KNOX_SERVICE_NAME, AuditConstants.KNOX_COMPONENT_NAME); private static GatewayServer server; private static GatewayServices services; - + private static Properties buildProperties; private Server jetty; private ErrorHandler errorHandler; private GatewayConfig config; private ContextHandlerCollection contexts; - private FileTopologyProvider monitor; + private TopologyService monitor; private TopologyListener listener; private Map<String, WebAppContext> deployments; @@ -94,8 +93,7 @@ public class GatewayServer { } else if( cmd.hasOption( GatewayCommandLine.VERSION_LONG ) ) { printVersion(); } else if( cmd.hasOption( GatewayCommandLine.REDEPLOY_LONG ) ) { - GatewayConfig config = new GatewayConfigImpl(); - redeployTopologies( config, cmd.getOptionValue( GatewayCommandLine.REDEPLOY_LONG ) ); + redeployTopologies( cmd.getOptionValue( GatewayCommandLine.REDEPLOY_LONG ) ); } else { buildProperties = loadBuildProperties(); services = instantiateGatewayServices(); @@ -177,12 +175,12 @@ public class GatewayServer { private static void configureKerberosSecurity( GatewayConfig config ) { System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "true"); System.setProperty(GatewayConfig.KRB5_CONFIG, config.getKerberosConfig()); - System.setProperty(GatewayConfig.KRB5_DEBUG, + System.setProperty(GatewayConfig.KRB5_DEBUG, Boolean.toString(config.isKerberosDebugEnabled())); System.setProperty(GatewayConfig.KRB5_LOGIN_CONFIG, config.getKerberosLoginConfig()); System.setProperty(GatewayConfig.KRB5_USE_SUBJECT_CREDS_ONLY, "false"); } - + private static Properties loadBuildProperties() { Properties properties = new Properties(); InputStream inputStream = GatewayServer.class.getClassLoader().getResourceAsStream( "build.properties" ); @@ -205,50 +203,11 @@ public class GatewayServer { input.close(); } - private static void redeployTopology( Topology topology ) { - File topologyFile = new File( topology.getUri() ); - long start = System.currentTimeMillis(); - long limit = 1000L; // One second. - long elapsed = 1; - while( elapsed <= limit ) { - try { - long origTimestamp = topologyFile.lastModified(); - long setTimestamp = Math.max( System.currentTimeMillis(), topologyFile.lastModified() + elapsed ); - if( topologyFile.setLastModified( setTimestamp ) ) { - long newTimstamp = topologyFile.lastModified(); - if( newTimstamp > origTimestamp ) { - break; - } else { - Thread.sleep( 10 ); - elapsed = System.currentTimeMillis() - start; - continue; - } - } else { - log.failedToRedeployTopology( topology.getName() ); - break; - } - } catch( InterruptedException e ) { - log.failedToRedeployTopology( topology.getName(), e ); - e.printStackTrace(); - } - } - } - public static void redeployTopologies( GatewayConfig config, String topologyName ) { - try { - File topologiesDir = calculateAbsoluteTopologiesDir( config ); - FileTopologyProvider provider = new FileTopologyProvider( topologiesDir ); - provider.reloadTopologies(); - for( Topology topology : provider.getTopologies() ) { - if( topologyName == null || topologyName.equals( topology.getName() ) ) { - redeployTopology( topology ); - } - } - } catch( SAXException e ) { - log.failedToRedeployTopologies( e ); - } catch( IOException e ) { - log.failedToRedeployTopologies( e ); - } + public static void redeployTopologies( String topologyName ) { + TopologyService ts = getGatewayServices().getService(GatewayServices.TOPOLOGY_SERVICE); + ts.reloadTopologies(); + ts.redeployTopologies(topologyName); } public static GatewayServer startGateway( GatewayConfig config, GatewayServices svcs ) { @@ -307,7 +266,7 @@ public class GatewayServer { // jetty.start(); // } - + private synchronized void start() throws Exception { // Create the global context handler. @@ -329,7 +288,7 @@ public class GatewayServer { if (config.isSSLEnabled()) { SSLService ssl = services.getService("SSLService"); String keystoreFileName = config.getGatewaySecurityDir() + File.separatorChar + "keystores" + File.separatorChar + "gateway.jks"; - Connector connector = (Connector) ssl.buildSSlConnector( keystoreFileName ); + Connector connector = (Connector) ssl.buildSSlConnector(keystoreFileName); connector.setHost(address.getHostName()); connector.setPort(address.getPort()); jetty.addConnector(connector); @@ -345,15 +304,15 @@ public class GatewayServer { // Create a dir/file based cluster topology provider. File topologiesDir = calculateAbsoluteTopologiesDir(); - monitor = new FileTopologyProvider( topologiesDir ); - monitor.addTopologyChangeListener( listener ); + monitor = services.getService(GatewayServices.TOPOLOGY_SERVICE); + monitor.addTopologyChangeListener(listener); // Load the current topologies. - log.loadingTopologiesFromDirectory( topologiesDir.getAbsolutePath() ); + log.loadingTopologiesFromDirectory(topologiesDir.getAbsolutePath()); monitor.reloadTopologies(); // Start the topology monitor. - log.monitoringTopologyChangesInDirectory( topologiesDir.getAbsolutePath() ); + log.monitoringTopologyChangesInDirectory(topologiesDir.getAbsolutePath()); monitor.startMonitor(); } http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/main/java/org/apache/hadoop/gateway/services/CLIGatewayServices.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/CLIGatewayServices.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/CLIGatewayServices.java index 66f22b8..6fc10a2 100644 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/CLIGatewayServices.java +++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/CLIGatewayServices.java @@ -23,6 +23,7 @@ import org.apache.hadoop.gateway.deploy.DeploymentContext; import org.apache.hadoop.gateway.descriptor.FilterParamDescriptor; import org.apache.hadoop.gateway.descriptor.ResourceDescriptor; import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; +import org.apache.hadoop.gateway.services.topology.impl.DefaultTopologyService; import org.apache.hadoop.gateway.services.security.impl.DefaultAliasService; import org.apache.hadoop.gateway.services.security.impl.DefaultCryptoService; import org.apache.hadoop.gateway.services.security.impl.DefaultKeystoreService; @@ -66,6 +67,10 @@ public class CLIGatewayServices implements GatewayServices { crypto.setAliasService(alias); crypto.init(config, options); services.put(CRYPTO_SERVICE, crypto); + + DefaultTopologyService tops = new DefaultTopologyService(); + tops.init( config, options ); + services.put(TOPOLOGY_SERVICE, tops); } public void start() throws ServiceLifecycleException { @@ -75,6 +80,9 @@ public class CLIGatewayServices implements GatewayServices { DefaultAliasService alias = (DefaultAliasService) services.get(ALIAS_SERVICE); alias.start(); + + DefaultTopologyService tops = (DefaultTopologyService)services.get(TOPOLOGY_SERVICE); + tops.start(); } public void stop() throws ServiceLifecycleException { @@ -84,6 +92,9 @@ public class CLIGatewayServices implements GatewayServices { DefaultAliasService alias = (DefaultAliasService) services.get(ALIAS_SERVICE); alias.stop(); + + DefaultTopologyService tops = (DefaultTopologyService)services.get(TOPOLOGY_SERVICE); + tops.stop(); } /* (non-Javadoc) http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java index eacd124..367901d 100644 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java +++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java @@ -23,6 +23,7 @@ import org.apache.hadoop.gateway.deploy.DeploymentContext; import org.apache.hadoop.gateway.descriptor.FilterParamDescriptor; import org.apache.hadoop.gateway.descriptor.ResourceDescriptor; import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; +import org.apache.hadoop.gateway.services.topology.impl.DefaultTopologyService; import org.apache.hadoop.gateway.services.hostmap.impl.DefaultHostMapperService; import org.apache.hadoop.gateway.services.registry.impl.DefaultServiceRegistryService; import org.apache.hadoop.gateway.services.security.KeystoreServiceException; @@ -96,6 +97,10 @@ public class DefaultGatewayServices implements GatewayServices { DefaultServerInfoService sis = new DefaultServerInfoService(); sis.init( config, options ); services.put( SERVER_INFO_SERVICE, sis ); + + DefaultTopologyService tops = new DefaultTopologyService(); + tops.init( config, options ); + services.put( TOPOLOGY_SERVICE, tops ); } public void start() throws ServiceLifecycleException { @@ -111,6 +116,9 @@ public class DefaultGatewayServices implements GatewayServices { ServerInfoService sis = (ServerInfoService) services.get(SERVER_INFO_SERVICE); sis.start(); + + DefaultTopologyService tops = (DefaultTopologyService)services.get(TOPOLOGY_SERVICE); + tops.start(); } public void stop() throws ServiceLifecycleException { @@ -126,6 +134,9 @@ public class DefaultGatewayServices implements GatewayServices { ServerInfoService sis = (ServerInfoService) services.get(SERVER_INFO_SERVICE); sis.stop(); + + DefaultTopologyService tops = (DefaultTopologyService)services.get(TOPOLOGY_SERVICE); + tops.stop(); } /* (non-Javadoc) http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java new file mode 100644 index 0000000..516cd24 --- /dev/null +++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.gateway.services.topology.impl; + + +import org.apache.commons.digester3.Digester; +import org.apache.commons.digester3.binder.DigesterLoader; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.io.monitor.FileAlterationListener; +import org.apache.commons.io.monitor.FileAlterationListenerAdaptor; +import org.apache.commons.io.monitor.FileAlterationMonitor; +import org.apache.commons.io.monitor.FileAlterationObserver; +import org.apache.hadoop.gateway.GatewayMessages; +import org.apache.hadoop.gateway.config.GatewayConfig; +import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; +import org.apache.hadoop.gateway.services.ServiceLifecycleException; +import org.apache.hadoop.gateway.services.topology.TopologyService; +import org.apache.hadoop.gateway.topology.TopologyListener; +import org.apache.hadoop.gateway.topology.TopologyMonitor; +import org.apache.hadoop.gateway.topology.builder.TopologyBuilder; +import org.apache.hadoop.gateway.topology.Topology; +import org.apache.hadoop.gateway.topology.TopologyProvider; +import org.apache.hadoop.gateway.topology.TopologyEvent; +import org.apache.hadoop.gateway.topology.xml.AmbariFormatXmlTopologyRules; +import org.apache.hadoop.gateway.topology.xml.KnoxFormatXmlTopologyRules; +import org.xml.sax.SAXException; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Collection; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.Map; +import java.util.HashSet; +import java.util.HashMap; +import java.util.Collections; + + +import static org.apache.commons.digester3.binder.DigesterLoader.newLoader; + + +public class DefaultTopologyService + extends FileAlterationListenerAdaptor + implements TopologyService, TopologyMonitor, TopologyProvider, FileFilter, FileAlterationListener { + + private static final List<String> SUPPORTED_TOPOLOGY_FILE_EXTENSIONS = new ArrayList<String>(); + static { + SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.add("xml"); + SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.add("conf"); + } + private static GatewayMessages log = MessagesFactory.get(GatewayMessages.class); + private static DigesterLoader digesterLoader = newLoader(new KnoxFormatXmlTopologyRules(), new AmbariFormatXmlTopologyRules()); + private FileAlterationMonitor monitor; + private File directory; + private Set<TopologyListener> listeners; + private volatile Map<File, Topology> topologies; + + private Topology loadTopology(File file) throws IOException, SAXException, URISyntaxException, InterruptedException { + final long TIMEOUT = 250; //ms + final long DELAY = 50; //ms + log.loadingTopologyFile(file.getAbsolutePath()); + Topology topology; + long start = System.currentTimeMillis(); + while (true) { + try { + topology = loadTopologyAttempt(file); + break; + } catch (IOException e) { + if (System.currentTimeMillis() - start < TIMEOUT) { + log.failedToLoadTopologyRetrying(file.getAbsolutePath(), Long.toString(DELAY), e); + Thread.sleep(DELAY); + } else { + throw e; + } + } catch (SAXException e) { + if (System.currentTimeMillis() - start < TIMEOUT) { + log.failedToLoadTopologyRetrying(file.getAbsolutePath(), Long.toString(DELAY), e); + Thread.sleep(DELAY); + } else { + throw e; + } + } + } + return topology; + } + + private Topology loadTopologyAttempt(File file) throws IOException, SAXException, URISyntaxException { + Topology topology; + Digester digester = digesterLoader.newDigester(); + TopologyBuilder topologyBuilder = digester.parse(FileUtils.openInputStream(file)); + topology = topologyBuilder.build(); + topology.setUri(file.toURI()); + topology.setName(FilenameUtils.removeExtension(file.getName())); + topology.setTimestamp(file.lastModified()); + return topology; + } + + private void redeployTopology(Topology topology) { + File topologyFile = new File(topology.getUri()); + long start = System.currentTimeMillis(); + long limit = 1000L; // One second. + long elapsed = 1; + while (elapsed <= limit) { + try { + long origTimestamp = topologyFile.lastModified(); + long setTimestamp = Math.max(System.currentTimeMillis(), topologyFile.lastModified() + elapsed); + if (topologyFile.setLastModified(setTimestamp)) { + long newTimstamp = topologyFile.lastModified(); + if (newTimstamp > origTimestamp) { + break; + } else { + Thread.sleep(10); + elapsed = System.currentTimeMillis() - start; + continue; + } + } else { + log.failedToRedeployTopology(topology.getName()); + break; + } + } catch (InterruptedException e) { + log.failedToRedeployTopology(topology.getName(), e); + e.printStackTrace(); + } + } + } + + private List<TopologyEvent> createChangeEvents( + Map<File, Topology> oldTopologies, + Map<File, Topology> newTopologies) { + ArrayList<TopologyEvent> events = new ArrayList<TopologyEvent>(); + // Go through the old topologies and find anything that was deleted. + for (File file : oldTopologies.keySet()) { + if (!newTopologies.containsKey(file)) { + events.add(new TopologyEvent(TopologyEvent.Type.DELETED, oldTopologies.get(file))); + } + } + // Go through the new topologies and figure out what was updated vs added. + for (File file : newTopologies.keySet()) { + if (oldTopologies.containsKey(file)) { + Topology oldTopology = oldTopologies.get(file); + Topology newTopology = newTopologies.get(file); + if (newTopology.getTimestamp() > oldTopology.getTimestamp()) { + events.add(new TopologyEvent(TopologyEvent.Type.UPDATED, newTopologies.get(file))); + } + } else { + events.add(new TopologyEvent(TopologyEvent.Type.CREATED, newTopologies.get(file))); + } + } + return events; + } + + private File calculateAbsoluteTopologiesDir(GatewayConfig config) { + + File topoDir = new File(config.getGatewayTopologyDir()); + topoDir = topoDir.getAbsoluteFile(); + return topoDir; + } + + private void initListener(FileAlterationMonitor monitor, File directory) { + this.directory = directory; + this.monitor = monitor; + + + FileAlterationObserver observer = new FileAlterationObserver(this.directory, this); + observer.addListener(this); + monitor.addObserver(observer); + + this.listeners = new HashSet<TopologyListener>(); + this.topologies = new HashMap<File, Topology>(); //loadTopologies( this.directory ); + } + + private void initListener(File directory) throws IOException, SAXException { + initListener(new FileAlterationMonitor(1000L), directory); + } + + private Map<File, Topology> loadTopologies(File directory) { + Map<File, Topology> map = new HashMap<File, Topology>(); + if (directory.exists() && directory.canRead()) { + for (File file : directory.listFiles(this)) { + try { + map.put(file, loadTopology(file)); + } catch (IOException e) { + // Maybe it makes sense to throw exception + log.failedToLoadTopology(file.getAbsolutePath(), e); + } catch (SAXException e) { + // Maybe it makes sense to throw exception + log.failedToLoadTopology(file.getAbsolutePath(), e); + } catch (Exception e) { + // Maybe it makes sense to throw exception + log.failedToLoadTopology(file.getAbsolutePath(), e); + } + } + } + return map; + } + + public void redeployTopologies(String topologyName) { + + for (Topology topology : getTopologies()) { + if (topologyName == null || topologyName.equals(topology.getName())) { + redeployTopology(topology); + } + } + + } + + public void reloadTopologies() { + try { + synchronized (this) { + Map<File, Topology> oldTopologies = topologies; + Map<File, Topology> newTopologies = loadTopologies(directory); + List<TopologyEvent> events = createChangeEvents(oldTopologies, newTopologies); + topologies = newTopologies; + notifyChangeListeners(events); + } + } catch (Exception e) { + // Maybe it makes sense to throw exception + log.failedToReloadTopologies(e); + } + } + + private void notifyChangeListeners(List<TopologyEvent> events) { + for (TopologyListener listener : listeners) { + try { + listener.handleTopologyEvent(events); + } catch (RuntimeException e) { + log.failedToHandleTopologyEvents(e); + } + } + } + + public Collection<Topology> getTopologies() { + Map<File, Topology> map = topologies; + return Collections.unmodifiableCollection(map.values()); + } + + @Override + public void addTopologyChangeListener(TopologyListener listener) { + listeners.add(listener); + } + + @Override + public void startMonitor() throws Exception { + monitor.start(); + } + + @Override + public void stopMonitor() throws Exception { + monitor.stop(); + } + + @Override + public boolean accept(File file) { + boolean accept = false; + if (!file.isDirectory() && file.canRead()) { + String extension = FilenameUtils.getExtension(file.getName()); + if (SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.contains(extension)) { + accept = true; + } + } + return accept; + } + + @Override + public void onFileCreate(File file) { + onFileChange(file); + } + + @Override + public void onFileDelete(java.io.File file) { + onFileChange(file); + } + + @Override + public void onFileChange(File file) { + reloadTopologies(); + } + + @Override + public void stop() { + + } + + @Override + public void start() { + + } + + @Override + public void init(GatewayConfig config, Map<String, String> options) throws ServiceLifecycleException { + + try { + initListener(calculateAbsoluteTopologiesDir(config)); + } catch (IOException io) { + throw new ServiceLifecycleException(io.getMessage()); + } catch (SAXException sax) { + throw new ServiceLifecycleException(sax.getMessage()); + } + + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/file/FileTopologyProvider.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/file/FileTopologyProvider.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/file/FileTopologyProvider.java deleted file mode 100644 index cd0567d..0000000 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/file/FileTopologyProvider.java +++ /dev/null @@ -1,249 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.gateway.topology.file; - -import org.apache.commons.digester3.Digester; -import org.apache.commons.digester3.binder.DigesterLoader; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.FilenameUtils; -import org.apache.commons.io.monitor.FileAlterationListener; -import org.apache.commons.io.monitor.FileAlterationListenerAdaptor; -import org.apache.commons.io.monitor.FileAlterationMonitor; -import org.apache.commons.io.monitor.FileAlterationObserver; -import org.apache.hadoop.gateway.GatewayMessages; -import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; -import org.apache.hadoop.gateway.topology.Topology; -import org.apache.hadoop.gateway.topology.TopologyEvent; -import org.apache.hadoop.gateway.topology.TopologyListener; -import org.apache.hadoop.gateway.topology.TopologyMonitor; -import org.apache.hadoop.gateway.topology.TopologyProvider; -import org.apache.hadoop.gateway.topology.builder.TopologyBuilder; -import org.apache.hadoop.gateway.topology.xml.AmbariFormatXmlTopologyRules; -import org.apache.hadoop.gateway.topology.xml.KnoxFormatXmlTopologyRules; -import org.xml.sax.SAXException; - -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.apache.commons.digester3.binder.DigesterLoader.newLoader; - -//import org.codehaus.plexus.util.FileUtils; - -public class FileTopologyProvider - extends FileAlterationListenerAdaptor - implements TopologyProvider, TopologyMonitor, FileFilter, FileAlterationListener { - - private static GatewayMessages log = MessagesFactory.get( GatewayMessages.class ); - private static DigesterLoader digesterLoader = newLoader( new KnoxFormatXmlTopologyRules(), new AmbariFormatXmlTopologyRules() ); - private static final List<String> SUPPORTED_TOPOLOGY_FILE_EXTENSIONS = new ArrayList<String>(); - static { - SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.add("xml"); - SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.add("conf"); - } - - private FileAlterationMonitor monitor; - private File directory; - private Set<TopologyListener> listeners; - private volatile Map<File,Topology> topologies; - - FileTopologyProvider( FileAlterationMonitor monitor, File directory ) { - this.directory = directory; - this.monitor = monitor; - - FileAlterationObserver observer = new FileAlterationObserver( this.directory, this ); - observer.addListener( this ); - monitor.addObserver( observer ); - - this.listeners = new HashSet<TopologyListener>(); - this.topologies = new HashMap<File,Topology>(); //loadTopologies( this.directory ); - } - - public FileTopologyProvider( File directory ) throws IOException, SAXException { - this( new FileAlterationMonitor( 1000L ), directory ); - } - - private static Topology loadTopology( File file ) throws IOException, SAXException, URISyntaxException, InterruptedException { - final long TIMEOUT = 250; //ms - final long DELAY = 50; //ms - log.loadingTopologyFile( file.getAbsolutePath() ); - Topology topology; - long start = System.currentTimeMillis(); - while( true ) { - try { - topology = loadTopologyAttempt( file ); - break; - } catch ( IOException e ) { - if( System.currentTimeMillis() - start < TIMEOUT ) { - log.failedToLoadTopologyRetrying( file.getAbsolutePath(), Long.toString( DELAY ), e ); - Thread.sleep( DELAY ); - } else { - throw e; - } - } catch ( SAXException e ) { - if( System.currentTimeMillis() - start < TIMEOUT ) { - log.failedToLoadTopologyRetrying( file.getAbsolutePath(), Long.toString( DELAY ), e ); - Thread.sleep( DELAY ); - } else { - throw e; - } - } - } - return topology; - } - - private static Topology loadTopologyAttempt( File file ) throws IOException, SAXException, URISyntaxException { - Topology topology;Digester digester = digesterLoader.newDigester(); - TopologyBuilder topologyBuilder = digester.parse( FileUtils.openInputStream( file ) ); - topology = topologyBuilder.build(); - topology.setUri( file.toURI() ); - topology.setName( FilenameUtils.removeExtension( file.getName() ) ); - topology.setTimestamp( file.lastModified() ); - return topology; - } - - private Map<File, Topology> loadTopologies( File directory ) { - Map<File, Topology> map = new HashMap<File, Topology>(); - if( directory.exists() && directory.canRead() ) { - for( File file : directory.listFiles( this ) ) { - try { - map.put( file, loadTopology( file ) ); - } catch( IOException e ) { - // Maybe it makes sense to throw exception - log.failedToLoadTopology( file.getAbsolutePath(), e ); - } catch( SAXException e ) { - // Maybe it makes sense to throw exception - log.failedToLoadTopology( file.getAbsolutePath(), e ); - } catch ( Exception e ) { - // Maybe it makes sense to throw exception - log.failedToLoadTopology( file.getAbsolutePath(), e ); - } - } - } - return map; - } - - public void reloadTopologies() { - try { - synchronized ( this ) { - Map<File,Topology> oldTopologies = topologies; - Map<File,Topology> newTopologies = loadTopologies( directory ); - List<TopologyEvent> events = createChangeEvents( oldTopologies, newTopologies ); - topologies = newTopologies; - notifyChangeListeners( events ); - } - } - catch( Exception e ) { - // Maybe it makes sense to throw exception - log.failedToReloadTopologies( e ); - } - } - - private static List<TopologyEvent> createChangeEvents( - Map<File,Topology> oldTopologies, - Map<File,Topology> newTopologies ) { - ArrayList<TopologyEvent> events = new ArrayList<TopologyEvent>(); - // Go through the old topologies and find anything that was deleted. - for( File file : oldTopologies.keySet() ) { - if( !newTopologies.containsKey( file ) ) { - events.add( new TopologyEvent( TopologyEvent.Type.DELETED, oldTopologies.get( file ) ) ); - } - } - // Go through the new topologies and figure out what was updated vs added. - for( File file : newTopologies.keySet() ) { - if( oldTopologies.containsKey( file ) ) { - Topology oldTopology = oldTopologies.get( file ); - Topology newTopology = newTopologies.get( file ); - if( newTopology.getTimestamp() > oldTopology.getTimestamp() ) { - events.add( new TopologyEvent( TopologyEvent.Type.UPDATED, newTopologies.get( file ) ) ); - } - } else { - events.add( new TopologyEvent( TopologyEvent.Type.CREATED, newTopologies.get( file ) ) ); - } - } - return events ; - } - - private void notifyChangeListeners( List<TopologyEvent> events ) { - for( TopologyListener listener : listeners ) { - try { - listener.handleTopologyEvent( events ); - } catch( RuntimeException e ) { - log.failedToHandleTopologyEvents( e ); - } - } - } - - @Override - public Collection<Topology> getTopologies() { - Map<File,Topology> map = topologies; - return Collections.unmodifiableCollection( map.values() ); - } - - @Override - public void addTopologyChangeListener( TopologyListener listener ) { - listeners.add( listener ); - } - - @Override - public void startMonitor() throws Exception { - monitor.start(); - } - - @Override - public void stopMonitor() throws Exception { - monitor.stop(); - } - - @Override - public boolean accept( File file ) { - boolean accept = false; - if( !file.isDirectory() && file.canRead() ) { - String extension = FilenameUtils.getExtension( file.getName() ); - if( SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.contains( extension ) ) { - accept = true; - } - } - return accept; - } - - @Override - public void onFileCreate( File file ) { - onFileChange( file ); - } - - @Override - public void onFileDelete(java.io.File file) { - onFileChange( file ); - } - - @Override - public void onFileChange( File file ) { - reloadTopologies(); - } - -} http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/main/java/org/apache/hadoop/gateway/util/KnoxCLI.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/util/KnoxCLI.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/util/KnoxCLI.java index c754fd7..499e001 100644 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/util/KnoxCLI.java +++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/util/KnoxCLI.java @@ -20,13 +20,13 @@ package org.apache.hadoop.gateway.util; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.gateway.GatewayCommandLine; -import org.apache.hadoop.gateway.GatewayServer; import org.apache.hadoop.gateway.config.GatewayConfig; import org.apache.hadoop.gateway.config.impl.GatewayConfigImpl; import org.apache.hadoop.gateway.services.CLIGatewayServices; import org.apache.hadoop.gateway.services.GatewayServices; import org.apache.hadoop.gateway.services.Service; import org.apache.hadoop.gateway.services.ServiceLifecycleException; +import org.apache.hadoop.gateway.services.topology.TopologyService; import org.apache.hadoop.gateway.services.security.AliasService; import org.apache.hadoop.gateway.services.security.KeystoreService; import org.apache.hadoop.gateway.services.security.KeystoreServiceException; @@ -274,6 +274,11 @@ public class KnoxCLI extends Configured implements Tool { KeystoreService ks = services.getService(GatewayServices.KEYSTORE_SERVICE); return ks; } + + protected TopologyService getTopologyService() { + TopologyService ts = services.getService(GatewayServices.TOPOLOGY_SERVICE); + return ts; + } } private class AliasListCommand extends Command { @@ -586,8 +591,9 @@ public class KnoxCLI extends Configured implements Tool { @Override public void execute() throws Exception { - GatewayConfig config = new GatewayConfigImpl(); - GatewayServer.redeployTopologies( config, cluster ); + TopologyService ts = getTopologyService(); + ts.reloadTopologies(); + ts.redeployTopologies(cluster); } @Override http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/test/java/org/apache/hadoop/gateway/services/topology/DefaultTopologyServiceTest.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/services/topology/DefaultTopologyServiceTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/services/topology/DefaultTopologyServiceTest.java new file mode 100644 index 0000000..d381aa8 --- /dev/null +++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/services/topology/DefaultTopologyServiceTest.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.services.topology; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.monitor.FileAlterationMonitor; +import org.apache.commons.io.monitor.FileAlterationObserver; +import org.apache.hadoop.gateway.config.GatewayConfig; +import org.apache.hadoop.gateway.services.topology.impl.DefaultTopologyService; +import org.apache.hadoop.gateway.topology.*; +import org.apache.hadoop.test.TestUtils; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.*; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.core.IsNull.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public class DefaultTopologyServiceTest { + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + private File createDir() throws IOException { + return TestUtils.createTempDir(this.getClass().getSimpleName() + "-"); + } + + private File createFile(File parent, String name, String resource, long timestamp) throws IOException { + File file = new File(parent, name); + if (!file.exists()) { + FileUtils.touch(file); + } + InputStream input = ClassLoader.getSystemResourceAsStream(resource); + OutputStream output = FileUtils.openOutputStream(file); + IOUtils.copy(input, output); + output.flush(); + input.close(); + output.close(); + file.setLastModified(timestamp); + assertTrue("Failed to create test file " + file.getAbsolutePath(), file.exists()); + assertTrue("Failed to populate test file " + file.getAbsolutePath(), file.length() > 0); + + return file; + } + + @Test + public void testGetTopologies() throws Exception { + + File dir = createDir(); + long time = dir.lastModified(); + try { + createFile(dir, "one.xml", "org/apache/hadoop/gateway/topology/file/topology-one.xml", time); + + TestTopologyListener topoListener = new TestTopologyListener(); + FileAlterationMonitor monitor = new FileAlterationMonitor(Long.MAX_VALUE); + + TopologyService provider = new DefaultTopologyService(); + Map<String, String> c = new HashMap<String, String>(); + + GatewayConfig config = EasyMock.createNiceMock(GatewayConfig.class); + EasyMock.expect(config.getGatewayTopologyDir()).andReturn(dir.toString()).anyTimes(); + EasyMock.replay(config); + + provider.init(config, c); + + + provider.addTopologyChangeListener(topoListener); + + provider.reloadTopologies(); + + + Collection<Topology> topologies = provider.getTopologies(); + assertThat(topologies, notNullValue()); + assertThat(topologies.size(), is(1)); + Topology topology = topologies.iterator().next(); + assertThat(topology.getName(), is("one")); + assertThat(topology.getTimestamp(), is(time)); + assertThat(topoListener.events.size(), is(1)); + topoListener.events.clear(); + + // Add a file to the directory. + File two = createFile(dir, "two.xml", "org/apache/hadoop/gateway/topology/file/topology-two.xml", 1L); + provider.reloadTopologies(); + topologies = provider.getTopologies(); + assertThat(topologies.size(), is(2)); + Set<String> names = new HashSet<String>(Arrays.asList("one", "two")); + Iterator<Topology> iterator = topologies.iterator(); + topology = iterator.next(); + assertThat(names, hasItem(topology.getName())); + names.remove(topology.getName()); + topology = iterator.next(); + assertThat(names, hasItem(topology.getName())); + names.remove(topology.getName()); + assertThat(names.size(), is(0)); + assertThat(topoListener.events.size(), is(1)); + List<TopologyEvent> events = topoListener.events.get(0); + assertThat(events.size(), is(1)); + TopologyEvent event = events.get(0); + assertThat(event.getType(), is(TopologyEvent.Type.CREATED)); + assertThat(event.getTopology(), notNullValue()); + + // Update a file in the directory. + two = createFile(dir, "two.xml", "org/apache/hadoop/gateway/topology/file/topology-three.xml", 2L); + provider.reloadTopologies(); + topologies = provider.getTopologies(); + assertThat(topologies.size(), is(2)); + names = new HashSet<String>(Arrays.asList("one", "two")); + iterator = topologies.iterator(); + topology = iterator.next(); + assertThat(names, hasItem(topology.getName())); + names.remove(topology.getName()); + topology = iterator.next(); + assertThat(names, hasItem(topology.getName())); + names.remove(topology.getName()); + assertThat(names.size(), is(0)); + + // Remove a file from the directory. + two.delete(); + provider.reloadTopologies(); + topologies = provider.getTopologies(); + assertThat(topologies.size(), is(1)); + topology = topologies.iterator().next(); + assertThat(topology.getName(), is("one")); + assertThat(topology.getTimestamp(), is(time)); + } finally { + FileUtils.deleteQuietly(dir); + } + } + + private void kickMonitor(FileAlterationMonitor monitor) { + for (FileAlterationObserver observer : monitor.getObservers()) { + observer.checkAndNotify(); + } + } + + @Test + public void testProviderParamsOrderIsPreserved() { + + Provider provider = new Provider(); + String names[] = {"ldapRealm=", + "ldapContextFactory", + "ldapRealm.contextFactory", + "ldapGroupRealm", + "ldapGroupRealm.contextFactory", + "ldapGroupRealm.contextFactory.systemAuthenticationMechanism" + }; + + Param param = null; + for (String name : names) { + param = new Param(); + param.setName(name); + param.setValue(name); + provider.addParam(param); + + } + Map<String, String> params = provider.getParams(); + Set<String> keySet = params.keySet(); + Iterator<String> iter = keySet.iterator(); + int i = 0; + while (iter.hasNext()) { + assertTrue(iter.next().equals(names[i++])); + } + + } + + private class TestTopologyListener implements TopologyListener { + + public ArrayList<List<TopologyEvent>> events = new ArrayList<List<TopologyEvent>>(); + + @Override + public void handleTopologyEvent(List<TopologyEvent> events) { + this.events.add(events); + } + + } + +} http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/test/java/org/apache/hadoop/gateway/topology/file/FileTopologyProviderTest.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/topology/file/FileTopologyProviderTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/topology/file/FileTopologyProviderTest.java deleted file mode 100644 index 2e20376..0000000 --- a/gateway-server/src/test/java/org/apache/hadoop/gateway/topology/file/FileTopologyProviderTest.java +++ /dev/null @@ -1,205 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.gateway.topology.file; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.core.IsNull.notNullValue; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.commons.io.monitor.FileAlterationMonitor; -import org.apache.commons.io.monitor.FileAlterationObserver; -import org.apache.hadoop.gateway.topology.Param; -import org.apache.hadoop.gateway.topology.Provider; -import org.apache.hadoop.gateway.topology.Topology; -import org.apache.hadoop.gateway.topology.TopologyEvent; -import org.apache.hadoop.gateway.topology.TopologyListener; -import org.apache.hadoop.test.TestUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class FileTopologyProviderTest { - - @Before - public void setUp() throws Exception { - } - - @After - public void tearDown() throws Exception { - } - - private File createDir() throws IOException { - return TestUtils.createTempDir( this.getClass().getSimpleName() + "-" ); - } - - private File createFile( File parent, String name, String resource, long timestamp ) throws IOException { - File file = new File( parent, name ); - if( !file.exists() ) { - FileUtils.touch( file ); - } - InputStream input = ClassLoader.getSystemResourceAsStream( resource ); - OutputStream output = FileUtils.openOutputStream( file ); - IOUtils.copy( input, output ); - output.flush(); - input.close(); - output.close(); - file.setLastModified( timestamp ); - assertTrue( "Failed to create test file " + file.getAbsolutePath(), file.exists() ); - assertTrue( "Failed to populate test file " + file.getAbsolutePath(), file.length() > 0 ); - - return file; - } - - @Test - public void testGetTopologies() throws Exception { - - File dir = createDir(); - long time = dir.lastModified(); - try { - createFile( dir, "one.xml", "org/apache/hadoop/gateway/topology/file/topology-one.xml", time ); - - TestTopologyListener topoListener = new TestTopologyListener(); - FileAlterationMonitor monitor = new FileAlterationMonitor( Long.MAX_VALUE ); - FileTopologyProvider provider = new FileTopologyProvider( monitor, dir ); - provider.addTopologyChangeListener( topoListener ); - - kickMonitor( monitor ); - - Collection<Topology> topologies = provider.getTopologies(); - assertThat( topologies, notNullValue() ); - assertThat( topologies.size(), is( 1 ) ); - Topology topology = topologies.iterator().next(); - assertThat( topology.getName(), is( "one" ) ); - assertThat( topology.getTimestamp(), is( time ) ); - assertThat( topoListener.events.size(), is( 1 ) ); - topoListener.events.clear(); - - // Add a file to the directory. - File two = createFile( dir, "two.xml", "org/apache/hadoop/gateway/topology/file/topology-two.xml", 1L ); - kickMonitor( monitor ); - topologies = provider.getTopologies(); - assertThat( topologies.size(), is( 2 ) ); - Set<String> names = new HashSet<String>( Arrays.asList( "one", "two" ) ); - Iterator<Topology> iterator = topologies.iterator(); - topology = iterator.next(); - assertThat( names, hasItem( topology.getName() ) ); - names.remove( topology.getName() ); - topology = iterator.next(); - assertThat( names, hasItem( topology.getName() ) ); - names.remove( topology.getName() ); - assertThat( names.size(), is( 0 ) ); - assertThat( topoListener.events.size(), is( 1 ) ); - List<TopologyEvent> events = topoListener.events.get( 0 ); - assertThat( events.size(), is( 1 ) ); - TopologyEvent event = events.get( 0 ); - assertThat( event.getType(), is( TopologyEvent.Type.CREATED ) ); - assertThat( event.getTopology(), notNullValue() ); - - // Update a file in the directory. - two = createFile( dir, "two.xml", "org/apache/hadoop/gateway/topology/file/topology-three.xml", 2L ); - kickMonitor( monitor ); - topologies = provider.getTopologies(); - assertThat( topologies.size(), is( 2 ) ); - names = new HashSet<String>( Arrays.asList( "one", "two" ) ); - iterator = topologies.iterator(); - topology = iterator.next(); - assertThat( names, hasItem( topology.getName() ) ); - names.remove( topology.getName() ); - topology = iterator.next(); - assertThat( names, hasItem( topology.getName() ) ); - names.remove( topology.getName() ); - assertThat( names.size(), is( 0 ) ); - - // Remove a file from the directory. - two.delete(); - kickMonitor( monitor ); - topologies = provider.getTopologies(); - assertThat( topologies.size(), is( 1 ) ); - topology = topologies.iterator().next(); - assertThat( topology.getName(), is( "one" ) ); - assertThat( topology.getTimestamp(), is( time ) ); - } finally { - FileUtils.deleteQuietly( dir ); - } - } - - private void kickMonitor( FileAlterationMonitor monitor ) { - for( FileAlterationObserver observer : monitor.getObservers() ) { - observer.checkAndNotify(); - } - } - - @Test - public void testProviderParamsOrderIsPreserved() { - - Provider provider = new Provider(); - String names[] = {"ldapRealm=", - "ldapContextFactory", - "ldapRealm.contextFactory", - "ldapGroupRealm", - "ldapGroupRealm.contextFactory", - "ldapGroupRealm.contextFactory.systemAuthenticationMechanism" - }; - - Param param = null; - for (String name : names) { - param = new Param(); - param.setName(name); - param.setValue(name); - provider.addParam(param); - - } - Map<String, String> params = provider.getParams(); - Set<String> keySet = params.keySet(); - Iterator<String> iter = keySet.iterator(); - int i = 0; - while (iter.hasNext()) { - assertTrue(iter.next().equals(names[i++])); - } - - } - - private class TestTopologyListener implements TopologyListener { - - public ArrayList<List<TopologyEvent>> events = new ArrayList<List<TopologyEvent>>(); - - @Override - public void handleTopologyEvent( List<TopologyEvent> events ) { - this.events.add( events ); - } - - } - -} http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java ---------------------------------------------------------------------- diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java index 9af6b96..33c844d 100644 --- a/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java +++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java @@ -35,9 +35,10 @@ public interface GatewayServices extends Service, ProviderDeploymentContributor public static final String SERVICE_REGISTRY_SERVICE = "ServiceRegistryService"; public static final String HOST_MAPPING_SERVICE = "HostMappingService"; public static final String SERVER_INFO_SERVICE = "ServerInfoService"; + public static final String TOPOLOGY_SERVICE = "TopologyService"; public abstract Collection<String> getServiceNames(); public abstract <T> T getService( String serviceName ); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/topology/TopologyService.java ---------------------------------------------------------------------- diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/topology/TopologyService.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/topology/TopologyService.java new file mode 100644 index 0000000..ab3bb70 --- /dev/null +++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/topology/TopologyService.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.services.topology; + +import org.apache.hadoop.gateway.services.Service; +import org.apache.hadoop.gateway.topology.Topology; +import org.apache.hadoop.gateway.topology.TopologyListener; + +import java.util.Collection; + + +public interface TopologyService extends Service { + + public void reloadTopologies(); + + public void redeployTopologies(String topologyName); + + public void addTopologyChangeListener(TopologyListener listener); + + public void startMonitor() throws Exception; + + public void stopMonitor() throws Exception; + + public Collection<Topology> getTopologies(); + +} http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-test/src/test/java/org/apache/hadoop/gateway/GatewayDeployFuncTest.java ---------------------------------------------------------------------- diff --git a/gateway-test/src/test/java/org/apache/hadoop/gateway/GatewayDeployFuncTest.java b/gateway-test/src/test/java/org/apache/hadoop/gateway/GatewayDeployFuncTest.java index a990e74..25f624c 100644 --- a/gateway-test/src/test/java/org/apache/hadoop/gateway/GatewayDeployFuncTest.java +++ b/gateway-test/src/test/java/org/apache/hadoop/gateway/GatewayDeployFuncTest.java @@ -231,7 +231,7 @@ public class GatewayDeployFuncTest { // Redeploy and make sure the timestamp is updated. topoTimestampBefore = descriptor.lastModified(); - GatewayServer.redeployTopologies( config, null ); + GatewayServer.redeployTopologies( null ); writeTime = System.currentTimeMillis(); topoTimestampAfter = descriptor.lastModified(); assertThat( topoTimestampAfter, greaterThan( topoTimestampBefore ) ); @@ -248,7 +248,7 @@ public class GatewayDeployFuncTest { // Redeploy and make sure the timestamp is updated. topoTimestampBefore = descriptor.lastModified(); - GatewayServer.redeployTopologies( config, "test-cluster" ); + GatewayServer.redeployTopologies( "test-cluster" ); writeTime = System.currentTimeMillis(); topoTimestampAfter = descriptor.lastModified(); assertThat( topoTimestampAfter, greaterThan( topoTimestampBefore ) );
