Repository: incubator-atlas Updated Branches: refs/heads/master bca454e16 -> 8bde666ba
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/webapp/src/main/java/org/apache/atlas/Atlas.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/Atlas.java b/webapp/src/main/java/org/apache/atlas/Atlas.java index 2d2d619..58c386d 100755 --- a/webapp/src/main/java/org/apache/atlas/Atlas.java +++ b/webapp/src/main/java/org/apache/atlas/Atlas.java @@ -18,6 +18,7 @@ package org.apache.atlas; +import org.apache.atlas.security.SecurityProperties; import org.apache.atlas.web.service.EmbeddedServer; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -96,10 +97,11 @@ public final class Atlas { setApplicationHome(); Configuration configuration = ApplicationProperties.get(); - final String enableTLSFlag = configuration.getString("atlas.enableTLS"); + final String enableTLSFlag = configuration.getString(SecurityProperties.TLS_ENABLED); final int appPort = getApplicationPort(cmd, enableTLSFlag, configuration); + System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, String.valueOf(appPort)); final boolean enableTLS = isTLSEnabled(enableTLSFlag, appPort); - configuration.setProperty("atlas.enableTLS", String.valueOf(enableTLS)); + configuration.setProperty(SecurityProperties.TLS_ENABLED, String.valueOf(enableTLS)); showStartupInfo(buildConfiguration, enableTLS, appPort); @@ -147,7 +149,7 @@ public final class Atlas { private static boolean isTLSEnabled(String enableTLSFlag, int appPort) { return Boolean.valueOf(StringUtils.isEmpty(enableTLSFlag) ? - System.getProperty("atlas.enableTLS", (appPort % 1000) == 443 ? "true" : "false") : enableTLSFlag); + System.getProperty(SecurityProperties.TLS_ENABLED, (appPort % 1000) == 443 ? "true" : "false") : enableTLSFlag); } private static void showStartupInfo(PropertiesConfiguration buildConfiguration, boolean enableTLS, int appPort) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java new file mode 100644 index 0000000..49ab1ba --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.filters; + +import org.apache.atlas.web.service.ActiveInstanceState; +import org.apache.atlas.web.service.ServiceState; +import org.apache.hadoop.http.HtmlQuoting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.HttpHeaders; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A servlet {@link Filter} that redirects web requests from a passive Atlas server instance to an active one. + * + * All requests to an active instance pass through. Requests received by a passive instance are redirected + * by identifying the currently active server. Requests to servers which are in transition are returned with + * an error SERVICE_UNAVAILABLE. Identification of this state is carried out using + * {@link ServiceState} and {@link ActiveInstanceState}. + */ +@Singleton +public class ActiveServerFilter implements Filter { + + private static final Logger LOG = LoggerFactory.getLogger(ActiveServerFilter.class); + private final ActiveInstanceState activeInstanceState; + private ServiceState serviceState; + + @Inject + public ActiveServerFilter(ActiveInstanceState activeInstanceState, ServiceState serviceState) { + this.activeInstanceState = activeInstanceState; + this.serviceState = serviceState; + } + + @Override + public void init(FilterConfig filterConfig) throws ServletException { + LOG.info("ActiveServerFilter initialized"); + } + + /** + * Determines if this Atlas server instance is passive and redirects to active if so. + * + * @param servletRequest Request object from which the URL and other parameters are determined. + * @param servletResponse Response object to handle the redirect. + * @param filterChain Chain to pass through requests if the instance is Active. + * @throws IOException + * @throws ServletException + */ + @Override + public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, + FilterChain filterChain) throws IOException, ServletException { + if (isInstanceActive()) { + LOG.debug("Active. Passing request downstream"); + filterChain.doFilter(servletRequest, servletResponse); + } else if (serviceState.isInstanceInTransition()) { + HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse; + LOG.error("Instance in transition. Service may not be ready to return a result"); + httpServletResponse.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + } else { + HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse; + String activeServerAddress = activeInstanceState.getActiveServerAddress(); + if (activeServerAddress == null) { + LOG.error("Could not retrieve active server address as it is null. Cannot redirect request {}", + ((HttpServletRequest)servletRequest).getRequestURI()); + httpServletResponse.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + } else { + handleRedirect((HttpServletRequest) servletRequest, httpServletResponse, activeServerAddress); + } + } + } + + boolean isInstanceActive() { + return serviceState.getState() == ServiceState.ServiceStateValue.ACTIVE; + } + + private void handleRedirect(HttpServletRequest servletRequest, HttpServletResponse httpServletResponse, + String activeServerAddress) throws IOException { + HttpServletRequest httpServletRequest = servletRequest; + String requestURI = httpServletRequest.getRequestURI(); + String queryString = httpServletRequest.getQueryString(); + if ((queryString != null) && (!queryString.isEmpty())) { + requestURI += "?" + queryString; + } + String quotedUri = HtmlQuoting.quoteHtmlChars(requestURI); + if (quotedUri == null) { + quotedUri = "/"; + } + String redirectLocation = activeServerAddress + quotedUri; + LOG.info("Not active. Redirecting to {}", redirectLocation); + // A POST/PUT/DELETE require special handling by sending HTTP 307 instead of the regular 301/302. + // Reference: http://stackoverflow.com/questions/2068418/whats-the-difference-between-a-302-and-a-307-redirect + if (isUnsafeHttpMethod(httpServletRequest)) { + httpServletResponse.setHeader(HttpHeaders.LOCATION, redirectLocation); + httpServletResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); + } else { + httpServletResponse.sendRedirect(redirectLocation); + } + } + + private boolean isUnsafeHttpMethod(HttpServletRequest httpServletRequest) { + String method = httpServletRequest.getMethod(); + return (method.equals(HttpMethod.POST)) || + (method.equals(HttpMethod.PUT)) || + (method.equals(HttpMethod.DELETE)); + } + + @Override + public void destroy() { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java index dac89d7..6bfd780 100755 --- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java +++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java @@ -34,11 +34,14 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.RepositoryMetadataModule; +import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.notification.NotificationModule; import org.apache.atlas.repository.graph.GraphProvider; import org.apache.atlas.service.Services; +import org.apache.atlas.web.filters.ActiveServerFilter; import org.apache.atlas.web.filters.AtlasAuthenticationFilter; import org.apache.atlas.web.filters.AuditFilter; +import org.apache.atlas.web.service.ActiveInstanceElectorModule; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.slf4j.Logger; @@ -72,11 +75,26 @@ public class GuiceServletConfig extends GuiceServletContextListener { LoginProcessor loginProcessor = new LoginProcessor(); loginProcessor.login(); - injector = Guice.createInjector(getRepositoryModule(), new NotificationModule(), - new JerseyServletModule() { + injector = Guice.createInjector(getRepositoryModule(), new ActiveInstanceElectorModule(), + new NotificationModule(), new JerseyServletModule() { + + private Configuration appConfiguration = null; + + private Configuration getConfiguration() { + if (appConfiguration == null) { + try { + appConfiguration = ApplicationProperties.get(); + } catch (AtlasException e) { + LOG.warn("Could not load application configuration", e); + } + } + return appConfiguration; + } + @Override protected void configureServlets() { filter("/*").through(AuditFilter.class); + configureActiveServerFilterIfNecessary(); try { configureAuthenticationFilter(); } catch (ConfigurationException e) { @@ -92,15 +110,24 @@ public class GuiceServletConfig extends GuiceServletContextListener { serve("/" + AtlasClient.BASE_URI + "*").with(GuiceContainer.class, params); } + private void configureActiveServerFilterIfNecessary() { + Configuration configuration = getConfiguration(); + if ((configuration == null) || + !HAConfiguration.isHAEnabled(configuration)) { + LOG.info("HA configuration is disabled, not activating ActiveServerFilter"); + } else { + filter("/*").through(ActiveServerFilter.class); + } + } + private void configureAuthenticationFilter() throws ConfigurationException { - try { - Configuration configuration = ApplicationProperties.get(); - if (Boolean.valueOf(configuration.getString(HTTP_AUTHENTICATION_ENABLED))) { - LOG.info("Enabling AuthenticationFilter"); - filter("/*").through(AtlasAuthenticationFilter.class); - } - } catch (AtlasException e) { - LOG.warn("Error loading configuration and initializing authentication filter", e); + Configuration configuration = getConfiguration(); + if (configuration == null) { + throw new ConfigurationException("Could not load application configuration"); + } + if (Boolean.valueOf(configuration.getString(HTTP_AUTHENTICATION_ENABLED))) { + LOG.info("Enabling AuthenticationFilter"); + filter("/*").through(AtlasAuthenticationFilter.class); } } }); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorModule.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorModule.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorModule.java new file mode 100644 index 0000000..d662683 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorModule.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import com.google.inject.AbstractModule; +import com.google.inject.multibindings.Multibinder; +import org.apache.atlas.notification.NotificationHookConsumer; +import org.apache.atlas.repository.audit.HBaseBasedAuditRepository; +import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; +import org.apache.atlas.service.Service; +import org.apache.atlas.listener.ActiveStateChangeHandler; +import org.apache.atlas.services.DefaultMetadataService; +import org.apache.atlas.web.filters.ActiveServerFilter; + +/** + * A Guice module that registers the handlers of High Availability state change handlers and other services. + * + * Any new handler that should react to HA state change should be registered here. + */ +public class ActiveInstanceElectorModule extends AbstractModule { + @Override + protected void configure() { + Multibinder<ActiveStateChangeHandler> activeStateChangeHandlerBinder = + Multibinder.newSetBinder(binder(), ActiveStateChangeHandler.class); + activeStateChangeHandlerBinder.addBinding().to(GraphBackedSearchIndexer.class); + activeStateChangeHandlerBinder.addBinding().to(DefaultMetadataService.class); + activeStateChangeHandlerBinder.addBinding().to(NotificationHookConsumer.class); + activeStateChangeHandlerBinder.addBinding().to(HBaseBasedAuditRepository.class); + + Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class); + serviceBinder.addBinding().to(ActiveInstanceElectorService.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java new file mode 100644 index 0000000..9d7db6d --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.listener.ActiveStateChangeHandler; +import org.apache.atlas.service.Service; +import org.apache.commons.configuration.Configuration; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +/** + * A service that implements leader election to determine whether this Atlas server is Active. + * + * The service implements leader election through <a href="http://curator.apache.org/">Curator</a>'s + * {@link LeaderLatch} recipe. The service also implements {@link LeaderLatchListener} to get + * notified of changes to leadership state. Upon becoming leader, this instance is treated as the + * active Atlas instance and calls {@link ActiveStateChangeHandler}s to activate them. Conversely, + * on being removed from leadership, this instance is treated as a passive instance and calls + * {@link ActiveStateChangeHandler}s to deactivate them. + */ +@Singleton +public class ActiveInstanceElectorService implements Service, LeaderLatchListener { + + private static final Logger LOG = LoggerFactory.getLogger(ActiveInstanceElectorService.class); + + private final Configuration configuration; + private final ServiceState serviceState; + private final ActiveInstanceState activeInstanceState; + private Collection<Provider<ActiveStateChangeHandler>> activeStateChangeHandlerProviders; + private Collection<ActiveStateChangeHandler> activeStateChangeHandlers; + private CuratorFactory curatorFactory; + private LeaderLatch leaderLatch; + private String serverId; + + /** + * Create a new instance of {@link ActiveInstanceElectorService} + * @param activeStateChangeHandlerProviders The list of registered {@link ActiveStateChangeHandler}s that + * must be called back on state changes. + * @throws AtlasException + */ + @Inject + public ActiveInstanceElectorService( + Collection<Provider<ActiveStateChangeHandler>> activeStateChangeHandlerProviders, + CuratorFactory curatorFactory, ActiveInstanceState activeInstanceState, + ServiceState serviceState) + throws AtlasException { + this(ApplicationProperties.get(), activeStateChangeHandlerProviders, + curatorFactory, activeInstanceState, serviceState); + } + + ActiveInstanceElectorService(Configuration configuration, + Collection<Provider<ActiveStateChangeHandler>> activeStateChangeHandlerProviders, + CuratorFactory curatorFactory, ActiveInstanceState activeInstanceState, + ServiceState serviceState) { + this.configuration = configuration; + this.activeStateChangeHandlerProviders = activeStateChangeHandlerProviders; + this.activeStateChangeHandlers = new ArrayList<>(); + this.curatorFactory = curatorFactory; + this.activeInstanceState = activeInstanceState; + this.serviceState = serviceState; + } + + /** + * Join leader election on starting up. + * + * If Atlas High Availability configuration is disabled, this operation is a no-op. + * @throws AtlasException + */ + @Override + public void start() throws AtlasException { + if (!HAConfiguration.isHAEnabled(configuration)) { + LOG.info("HA is not enabled, no need to start leader election service"); + return; + } + cacheActiveStateChangeHandlers(); + serverId = HAConfiguration.getAtlasServerId(configuration); + joinElection(); + } + + private void joinElection() { + LOG.info("Starting leader election for {}", serverId); + leaderLatch = curatorFactory.leaderLatchInstance(serverId); + leaderLatch.addListener(this); + try { + leaderLatch.start(); + LOG.info("Leader latch started for {}.", serverId); + } catch (Exception e) { + LOG.info("Exception while starting leader latch for {}.", serverId, e); + } + } + + /** + * Leave leader election process and clean up resources on shutting down. + * + * If Atlas High Availability configuration is disabled, this operation is a no-op. + * @throws AtlasException + */ + @Override + public void stop() { + if (!HAConfiguration.isHAEnabled(configuration)) { + LOG.info("HA is not enabled, no need to stop leader election service"); + return; + } + try { + leaderLatch.close(); + curatorFactory.close(); + } catch (IOException e) { + LOG.error("Error closing leader latch", e); + } + } + + /** + * Call all registered {@link ActiveStateChangeHandler}s on being elected active. + * + * In addition, shared state information about this instance becoming active is updated + * using {@link ActiveInstanceState}. + */ + @Override + public void isLeader() { + LOG.warn("Server instance with server id {} is elected as leader", serverId); + serviceState.becomingActive(); + try { + for (ActiveStateChangeHandler handler : activeStateChangeHandlers) { + handler.instanceIsActive(); + } + activeInstanceState.update(serverId); + serviceState.setActive(); + } catch (Exception e) { + LOG.error("Got exception while activating", e); + notLeader(); + rejoinElection(); + } + } + + private void cacheActiveStateChangeHandlers() { + if (activeStateChangeHandlers.size()==0) { + for (Provider<ActiveStateChangeHandler> provider : activeStateChangeHandlerProviders) { + ActiveStateChangeHandler handler = provider.get(); + activeStateChangeHandlers.add(handler); + } + } + } + + private void rejoinElection() { + try { + leaderLatch.close(); + joinElection(); + } catch (IOException e) { + LOG.error("Error rejoining election", e); + } + } + + /** + * Call all registered {@link ActiveStateChangeHandler}s on becoming passive instance. + */ + @Override + public void notLeader() { + LOG.warn("Server instance with server id {} is removed as leader", serverId); + serviceState.becomingPassive(); + for (ActiveStateChangeHandler handler: activeStateChangeHandlers) { + try { + handler.instanceIsPassive(); + } catch (AtlasException e) { + LOG.error("Error while reacting to passive state.", e); + } + } + serviceState.setPassive(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java new file mode 100644 index 0000000..88c3adb --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import com.google.inject.Inject; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; + +/** + * An object that encapsulates storing and retrieving state related to an Active Atlas server. + * + * The current implementation uses Zookeeper to store and read this state from. It does this + * under a read-write lock implemented using Curator's {@link InterProcessReadWriteLock} to + * provide for safety across multiple processes. + */ +public class ActiveInstanceState { + + private final Configuration configuration; + private final CuratorFactory curatorFactory; + + public static final String APACHE_ATLAS_ACTIVE_SERVER_INFO = "/apache_atlas_active_server_info"; + private static final Logger LOG = LoggerFactory.getLogger(ActiveInstanceState.class); + + /** + * Create a new instance of {@link ActiveInstanceState}. + * @param curatorFactory an instance of {@link CuratorFactory} to get the {@link InterProcessReadWriteLock} + * @throws AtlasException + */ + @Inject + public ActiveInstanceState(CuratorFactory curatorFactory) throws AtlasException { + this(ApplicationProperties.get(), curatorFactory); + } + + /** + * Create a new instance of {@link ActiveInstanceState}. + * @param configuration an instance of {@link Configuration} created from Atlas configuration + * @param curatorFactory an instance of {@link CuratorFactory} to get the {@link InterProcessReadWriteLock} + * @throws AtlasException + */ + public ActiveInstanceState(Configuration configuration, CuratorFactory curatorFactory) { + this.configuration = configuration; + this.curatorFactory = curatorFactory; + } + + /** + * Update state of the active server instance. + * + * This method writes this instance's Server Address to a shared node in Zookeeper. + * This information is used by other passive instances to locate the current active server. + * @throws Exception + * @param serverId ID of this server instance + */ + public void update(String serverId) throws Exception { + CuratorFramework client = curatorFactory.clientInstance(); + String atlasServerAddress = HAConfiguration.getBoundAddressForId(configuration, serverId); + Stat serverInfo = client.checkExists().forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO); + if (serverInfo == null) { + client.create().withMode(CreateMode.EPHEMERAL).forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO); + } + client.setData().forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO, + atlasServerAddress.getBytes(Charset.forName("UTF-8"))); + } + + /** + * Retrieve state of the active server instance. + * + * This method reads the active server location from the shared node in Zookeeper. + * @return the active server's address and port of form http://host-or-ip:port + */ + public String getActiveServerAddress() { + CuratorFramework client = curatorFactory.clientInstance(); + String serverAddress = null; + try { + byte[] bytes = client.getData().forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO); + serverAddress = new String(bytes, Charset.forName("UTF-8")); + } catch (Exception e) { + LOG.error("Error getting active server address", e); + } + return serverAddress; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/webapp/src/main/java/org/apache/atlas/web/service/CuratorFactory.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/service/CuratorFactory.java b/webapp/src/main/java/org/apache/atlas/web/service/CuratorFactory.java new file mode 100644 index 0000000..0bee340 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/service/CuratorFactory.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import com.google.inject.Singleton; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; +import org.apache.curator.retry.ExponentialBackoffRetry; + +/** + * A factory to create objects related to Curator. + * + * Allows for stubbing in tests. + */ +@Singleton +public class CuratorFactory { + public static final String APACHE_ATLAS_LEADER_ELECTOR_PATH = "/apache_atlas_leader_elector_path"; + + private final Configuration configuration; + private CuratorFramework curatorFramework; + + /** + * Initializes the {@link CuratorFramework} that is used for all interaction with Zookeeper. + * @throws AtlasException + */ + public CuratorFactory() throws AtlasException { + configuration = ApplicationProperties.get(); + initializeCuratorFramework(); + } + + private void initializeCuratorFramework() { + HAConfiguration.ZookeeperProperties zookeeperProperties = + HAConfiguration.getZookeeperProperties(configuration); + curatorFramework = CuratorFrameworkFactory.builder(). + connectString(zookeeperProperties.getConnectString()). + sessionTimeoutMs(zookeeperProperties.getSessionTimeout()). + retryPolicy(new ExponentialBackoffRetry( + zookeeperProperties.getRetriesSleepTimeMillis(), zookeeperProperties.getNumRetries())).build(); + curatorFramework.start(); + } + + /** + * Cleanup resources related to {@link CuratorFramework}. + * + * After this call, no further calls to any curator objects should be done. + */ + public void close() { + curatorFramework.close(); + } + + /** + * Returns a pre-created instance of {@link CuratorFramework}. + * + * This method can be called any number of times to access the {@link CuratorFramework} used in the + * application. + * @return + */ + public CuratorFramework clientInstance() { + return curatorFramework; + } + + /** + * Create a new instance {@link LeaderLatch} + * @param serverId the ID used to register this instance with curator. + * This ID should typically be obtained using + * {@link HAConfiguration#getAtlasServerId(Configuration)} + * @return + */ + public LeaderLatch leaderLatchInstance(String serverId) { + return new LeaderLatch(curatorFramework, APACHE_ATLAS_LEADER_ELECTOR_PATH, serverId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/webapp/src/main/java/org/apache/atlas/web/service/ServiceState.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ServiceState.java b/webapp/src/main/java/org/apache/atlas/web/service/ServiceState.java new file mode 100644 index 0000000..2d9e00a --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/service/ServiceState.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import com.google.common.base.Preconditions; +import com.google.inject.Singleton; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A class that maintains the state of this instance. + * + * The states are maintained at a granular level, including in-transition states. The transitions are + * directed by {@link ActiveInstanceElectorService}. + */ +@Singleton +public class ServiceState { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceState.class); + + public enum ServiceStateValue { + ACTIVE, + PASSIVE, + BECOMING_ACTIVE, + BECOMING_PASSIVE + } + + private Configuration configuration; + private volatile ServiceStateValue state; + + public ServiceState() throws AtlasException { + this(ApplicationProperties.get()); + } + + public ServiceState(Configuration configuration) { + this.configuration = configuration; + state = !HAConfiguration.isHAEnabled(configuration) ? + ServiceStateValue.ACTIVE : ServiceStateValue.PASSIVE; + } + + public ServiceStateValue getState() { + return state; + } + + public void becomingActive() { + LOG.warn("Instance becoming active from {}", state); + setState(ServiceStateValue.BECOMING_ACTIVE); + } + + private void setState(ServiceStateValue newState) { + Preconditions.checkState(HAConfiguration.isHAEnabled(configuration), + "Cannot change state as requested, as HA is not enabled for this instance."); + state = newState; + } + + public void setActive() { + LOG.warn("Instance is active from {}", state); + setState(ServiceStateValue.ACTIVE); + } + + public void becomingPassive() { + LOG.warn("Instance becoming passive from {}", state); + setState(ServiceStateValue.BECOMING_PASSIVE); + } + + public void setPassive() { + LOG.warn("Instance is passive from {}", state); + setState(ServiceStateValue.PASSIVE); + } + + public boolean isInstanceInTransition() { + ServiceStateValue state = getState(); + return state == ServiceStateValue.BECOMING_ACTIVE + || state == ServiceStateValue.BECOMING_PASSIVE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/webapp/src/test/java/org/apache/atlas/web/filters/ActiveServerFilterTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/filters/ActiveServerFilterTest.java b/webapp/src/test/java/org/apache/atlas/web/filters/ActiveServerFilterTest.java new file mode 100644 index 0000000..c6962fa --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/web/filters/ActiveServerFilterTest.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.filters; + +import org.apache.atlas.web.service.ActiveInstanceState; +import org.apache.atlas.web.service.ServiceState; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import javax.servlet.FilterChain; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.HttpMethod; + +import java.io.IOException; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class ActiveServerFilterTest { + + public static final String ACTIVE_SERVER_ADDRESS = "http://localhost:21000/"; + @Mock + private ActiveInstanceState activeInstanceState; + + @Mock + private HttpServletRequest servletRequest; + + @Mock + private HttpServletResponse servletResponse; + + @Mock + private FilterChain filterChain; + + @Mock + private ServiceState serviceState; + + @BeforeMethod + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testShouldPassThroughRequestsIfActive() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(filterChain).doFilter(servletRequest, servletResponse); + } + + @Test + public void testShouldFailIfCannotRetrieveActiveServerAddress() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + when(activeInstanceState.getActiveServerAddress()).thenReturn(null); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(servletResponse).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + } + + @Test + public void testShouldRedirectRequestToActiveServerAddress() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS); + when(servletRequest.getRequestURI()).thenReturn("types"); + when(servletRequest.getMethod()).thenReturn(HttpMethod.GET); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(servletResponse).sendRedirect(ACTIVE_SERVER_ADDRESS+"types"); + } + + @Test + public void testRedirectedRequestShouldContainQueryParameters() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS); + when(servletRequest.getMethod()).thenReturn(HttpMethod.GET); + when(servletRequest.getRequestURI()).thenReturn("types"); + when(servletRequest.getQueryString()).thenReturn("query=TRAIT"); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(servletResponse).sendRedirect(ACTIVE_SERVER_ADDRESS+"types?query=TRAIT"); + + } + + @Test + public void testShouldRedirectPOSTRequest() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS); + when(servletRequest.getMethod()).thenReturn(HttpMethod.POST); + when(servletRequest.getRequestURI()).thenReturn("types"); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(servletResponse).setHeader("Location", ACTIVE_SERVER_ADDRESS+"types"); + verify(servletResponse).setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); + } + + @Test + public void testShouldRedirectPUTRequest() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS); + when(servletRequest.getMethod()).thenReturn(HttpMethod.PUT); + when(servletRequest.getRequestURI()).thenReturn("types"); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(servletResponse).setHeader("Location", ACTIVE_SERVER_ADDRESS+"types"); + verify(servletResponse).setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); + } + + @Test + public void testShouldRedirectDELETERequest() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS); + when(servletRequest.getMethod()).thenReturn(HttpMethod.DELETE); + when(servletRequest.getRequestURI()). + thenReturn("api/atlas/entities/6ebb039f-eaa5-4b9c-ae44-799c7910545d/traits/test_tag_ha3"); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(servletResponse).setHeader("Location", ACTIVE_SERVER_ADDRESS + + "api/atlas/entities/6ebb039f-eaa5-4b9c-ae44-799c7910545d/traits/test_tag_ha3"); + verify(servletResponse).setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); + } + + @Test + public void testShouldReturnServiceUnavailableIfStateBecomingActive() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.BECOMING_ACTIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(servletResponse).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java new file mode 100644 index 0000000..e6a46f7 --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java @@ -0,0 +1,364 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import com.google.inject.Provider; +import org.apache.atlas.AtlasConstants; +import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.listener.ActiveStateChangeHandler; +import org.apache.commons.configuration.Configuration; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class ActiveInstanceElectorServiceTest { + + @Mock + private Configuration configuration; + + @Mock + private CuratorFactory curatorFactory; + + @Mock + private ActiveInstanceState activeInstanceState; + + @Mock + private ServiceState serviceState; + + @BeforeMethod + public void setup() { + System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, "21000"); + MockitoAnnotations.initMocks(this); + } + + @Test + public void testLeaderElectionIsJoinedOnStart() throws Exception { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + + verify(leaderLatch).start(); + } + + @Test + public void testListenerIsAddedForActiveInstanceCallbacks() throws Exception { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + + verify(leaderLatch).addListener(activeInstanceElectorService); + } + + @Test + public void testLeaderElectionIsNotStartedIfNotInHAMode() throws AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + + verifyZeroInteractions(curatorFactory); + } + + @Test + public void testLeaderElectionIsLeftOnStop() throws IOException, AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + activeInstanceElectorService.stop(); + + verify(leaderLatch).close(); + } + + @Test + public void testCuratorFactoryIsClosedOnStop() throws AtlasException { + + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + activeInstanceElectorService.stop(); + + verify(curatorFactory).close(); + } + + @Test + public void testNoActionOnStopIfHAModeIsDisabled() { + + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.stop(); + + verifyZeroInteractions(curatorFactory); + } + + @Test + public void testRegisteredHandlersAreNotifiedWhenInstanceIsActive() throws AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + Collection<Provider<ActiveStateChangeHandler>> changeHandlers = new ArrayList(); + final ActiveStateChangeHandler handler1 = mock(ActiveStateChangeHandler.class); + final ActiveStateChangeHandler handler2 = mock(ActiveStateChangeHandler.class); + + changeHandlers.add(new Provider<ActiveStateChangeHandler>() { + @Override + public ActiveStateChangeHandler get() { + return handler1; + } + }); + + changeHandlers.add(new Provider<ActiveStateChangeHandler>() { + @Override + public ActiveStateChangeHandler get() { + return handler2; + } + }); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + activeInstanceElectorService.isLeader(); + + verify(handler1).instanceIsActive(); + verify(handler2).instanceIsActive(); + } + + @Test + public void testSharedStateIsUpdatedWhenInstanceIsActive() throws Exception { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + + activeInstanceElectorService.start(); + activeInstanceElectorService.isLeader(); + + verify(activeInstanceState).update("id1"); + } + + @Test + public void testRegisteredHandlersAreNotifiedOfPassiveWhenStateUpdateFails() throws Exception { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + Collection<Provider<ActiveStateChangeHandler>> changeHandlers = new ArrayList(); + final ActiveStateChangeHandler handler1 = mock(ActiveStateChangeHandler.class); + final ActiveStateChangeHandler handler2 = mock(ActiveStateChangeHandler.class); + + changeHandlers.add(new Provider<ActiveStateChangeHandler>() { + @Override + public ActiveStateChangeHandler get() { + return handler1; + } + }); + + changeHandlers.add(new Provider<ActiveStateChangeHandler>() { + @Override + public ActiveStateChangeHandler get() { + return handler2; + } + }); + + doThrow(new Exception()).when(activeInstanceState).update("id1"); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + activeInstanceElectorService.isLeader(); + + verify(handler1).instanceIsPassive(); + verify(handler2).instanceIsPassive(); + } + + @Test + public void testElectionIsRejoinedWhenStateUpdateFails() throws Exception { + + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + doThrow(new Exception()).when(activeInstanceState).update("id1"); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + + activeInstanceElectorService.start(); + activeInstanceElectorService.isLeader(); + + InOrder inOrder = inOrder(leaderLatch, curatorFactory); + inOrder.verify(leaderLatch).close(); + inOrder.verify(curatorFactory).leaderLatchInstance("id1"); + inOrder.verify(leaderLatch).addListener(activeInstanceElectorService); + inOrder.verify(leaderLatch).start(); + } + + @Test + public void testRegisteredHandlersAreNotifiedOfPassiveWhenInstanceIsPassive() throws AtlasException { + + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + Collection<Provider<ActiveStateChangeHandler>> changeHandlers = new ArrayList(); + final ActiveStateChangeHandler handler1 = mock(ActiveStateChangeHandler.class); + final ActiveStateChangeHandler handler2 = mock(ActiveStateChangeHandler.class); + + changeHandlers.add(new Provider<ActiveStateChangeHandler>() { + @Override + public ActiveStateChangeHandler get() { + return handler1; + } + }); + + changeHandlers.add(new Provider<ActiveStateChangeHandler>() { + @Override + public ActiveStateChangeHandler get() { + return handler2; + } + }); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + activeInstanceElectorService.notLeader(); + + verify(handler1).instanceIsPassive(); + verify(handler2).instanceIsPassive(); + } + + @Test + public void testActiveStateSetOnBecomingLeader() { + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), + curatorFactory, activeInstanceState, serviceState); + + activeInstanceElectorService.isLeader(); + + InOrder inOrder = inOrder(serviceState); + inOrder.verify(serviceState).becomingActive(); + inOrder.verify(serviceState).setActive(); + } + + @Test + public void testPassiveStateSetOnLoosingLeadership() { + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), + curatorFactory, activeInstanceState, serviceState); + + activeInstanceElectorService.notLeader(); + + InOrder inOrder = inOrder(serviceState); + inOrder.verify(serviceState).becomingPassive(); + inOrder.verify(serviceState).setPassive(); + } + + @Test + public void testPassiveStateSetIfActivationFails() throws Exception { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + doThrow(new Exception()).when(activeInstanceState).update("id1"); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), + curatorFactory, activeInstanceState, serviceState); + activeInstanceElectorService.start(); + activeInstanceElectorService.isLeader(); + + InOrder inOrder = inOrder(serviceState); + inOrder.verify(serviceState).becomingActive(); + inOrder.verify(serviceState).becomingPassive(); + inOrder.verify(serviceState).setPassive(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceStateTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceStateTest.java b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceStateTest.java new file mode 100644 index 0000000..939d0ca --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceStateTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import org.apache.atlas.ha.HAConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.CreateBuilder; +import org.apache.curator.framework.api.ExistsBuilder; +import org.apache.curator.framework.api.GetDataBuilder; +import org.apache.curator.framework.api.SetDataBuilder; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.Stat; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import java.nio.charset.Charset; + +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; +import static org.testng.Assert.assertNull; + +public class ActiveInstanceStateTest { + + private static final String HOST_PORT = "127.0.0.1:21000"; + public static final String SERVER_ADDRESS = "http://" + HOST_PORT; + @Mock + private Configuration configuration; + + @Mock + private CuratorFactory curatorFactory; + + @Mock + private CuratorFramework curatorFramework; + + @BeforeTest + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testSharedPathIsCreatedIfNotExists() throws Exception { + + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn(HOST_PORT); + + when(curatorFactory.clientInstance()).thenReturn(curatorFramework); + + ExistsBuilder existsBuilder = mock(ExistsBuilder.class); + when(curatorFramework.checkExists()).thenReturn(existsBuilder); + when(existsBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)).thenReturn(null); + + CreateBuilder createBuilder = mock(CreateBuilder.class); + when(curatorFramework.create()).thenReturn(createBuilder); + when(createBuilder.withMode(CreateMode.EPHEMERAL)).thenReturn(createBuilder); + + SetDataBuilder setDataBuilder = mock(SetDataBuilder.class); + when(curatorFramework.setData()).thenReturn(setDataBuilder); + + ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory); + activeInstanceState.update("id1"); + + verify(createBuilder).forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO); + } + + @Test + public void testDataIsUpdatedWithAtlasServerAddress() throws Exception { + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn(HOST_PORT); + + when(curatorFactory.clientInstance()).thenReturn(curatorFramework); + ExistsBuilder existsBuilder = mock(ExistsBuilder.class); + when(curatorFramework.checkExists()).thenReturn(existsBuilder); + when(existsBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)).thenReturn(new Stat()); + + SetDataBuilder setDataBuilder = mock(SetDataBuilder.class); + when(curatorFramework.setData()).thenReturn(setDataBuilder); + + ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory); + activeInstanceState.update("id1"); + + verify(setDataBuilder).forPath( + ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO, + SERVER_ADDRESS.getBytes(Charset.forName("UTF-8"))); + } + + @Test + public void testShouldReturnActiveServerAddress() throws Exception { + when(curatorFactory.clientInstance()).thenReturn(curatorFramework); + + GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); + when(curatorFramework.getData()).thenReturn(getDataBuilder); + when(getDataBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)). + thenReturn(SERVER_ADDRESS.getBytes(Charset.forName("UTF-8"))); + + ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory); + String actualServerAddress = activeInstanceState.getActiveServerAddress(); + + assertEquals(SERVER_ADDRESS, actualServerAddress); + } + + @Test + public void testShouldHandleExceptionsInFetchingServerAddress() throws Exception { + when(curatorFactory.clientInstance()).thenReturn(curatorFramework); + + GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); + when(curatorFramework.getData()).thenReturn(getDataBuilder); + when(getDataBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)). + thenThrow(new Exception()); + + ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory); + assertNull(activeInstanceState.getActiveServerAddress()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/webapp/src/test/java/org/apache/atlas/web/service/ServiceStateTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/service/ServiceStateTest.java b/webapp/src/test/java/org/apache/atlas/web/service/ServiceStateTest.java new file mode 100644 index 0000000..77aba88 --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/web/service/ServiceStateTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import org.apache.atlas.ha.HAConfiguration; +import org.apache.commons.configuration.Configuration; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +public class ServiceStateTest { + + @Mock + private Configuration configuration; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testShouldBeActiveIfHAIsDisabled() { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); + + ServiceState serviceState = new ServiceState(configuration); + assertEquals(ServiceState.ServiceStateValue.ACTIVE, serviceState.getState()); + } + + @Test(expectedExceptions = IllegalStateException.class) + public void testShouldDisallowTransitionIfHAIsDisabled() { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); + + ServiceState serviceState = new ServiceState(configuration); + serviceState.becomingPassive(); + fail("Should not allow transition"); + } + + @Test + public void testShouldChangeStateIfHAIsEnabled() { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + + ServiceState serviceState = new ServiceState(configuration); + serviceState.becomingPassive(); + assertEquals(ServiceState.ServiceStateValue.BECOMING_PASSIVE, serviceState.getState()); + } +}
