Repository: aurora Updated Branches: refs/heads/master ae051f3b9 -> f4a08459c
Plumb Curator discovery as an option. This Adds a Guice module for the Curator discovery implementations and re-works the `ServiceDiscoveryModule` to optionally bind it when the new `-zk_use_curator` flag is set. Bugs closed: AURORA-1468, AURORA-1669 Reviewed at https://reviews.apache.org/r/46286/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f4a08459 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f4a08459 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f4a08459 Branch: refs/heads/master Commit: f4a08459c2e03ba4ab51b1b90812812f076a5472 Parents: ae051f3 Author: John Sirois <jsir...@apache.org> Authored: Mon Apr 18 12:26:39 2016 -0600 Committer: John Sirois <john.sir...@gmail.com> Committed: Mon Apr 18 12:26:39 2016 -0600 ---------------------------------------------------------------------- RELEASE-NOTES.md | 10 +- config/legacy_untested_classes.txt | 5 +- examples/vagrant/upstart/aurora-scheduler.conf | 1 + .../aurora/scheduler/app/SchedulerMain.java | 6 +- .../discovery/CommonsServerGroupMonitor.java | 59 ------- .../CommonsServiceDiscoveryModule.java | 102 +++++++++++ .../discovery/CommonsServiceGroupMonitor.java | 59 +++++++ .../CuratorServiceDiscoveryModule.java | 172 +++++++++++++++++++ .../discovery/FlaggedZooKeeperConfig.java | 6 + .../discovery/ServiceDiscoveryBindings.java | 59 +++++++ .../discovery/ServiceDiscoveryModule.java | 150 ++++++++++------ .../discovery/ZooKeeperClientModule.java | 144 ---------------- .../scheduler/discovery/ZooKeeperConfig.java | 44 ++++- .../log/mesos/MesosLogStreamModule.java | 14 +- .../aurora/scheduler/app/SchedulerIT.java | 13 +- .../discovery/AbstractDiscoveryModuleTest.java | 77 +++++++++ .../discovery/CommonsDiscoveryModuleTest.java | 29 ++++ .../CommonsServerGroupMonitorTest.java | 137 --------------- .../CommonsServiceGroupMonitorTest.java | 137 +++++++++++++++ .../discovery/CuratorDiscoveryModuleTest.java | 67 ++++++++ .../discovery/ZooKeeperConfigTest.java | 23 +-- 21 files changed, 886 insertions(+), 428 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index a0536ec..99d261b 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -1,4 +1,12 @@ -0.13.0 (Not yet released) +0.14.0 (Not yet released) +------ + +### New/updated: + +- Added a new optional [Apache Curator](https://curator.apache.org/) backend for performing + scheduler leader election. You can enable this with the new `-zk_use_curator` scheduler argument. + +0.13.0 ------ ### New/updated: http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/config/legacy_untested_classes.txt ---------------------------------------------------------------------- diff --git a/config/legacy_untested_classes.txt b/config/legacy_untested_classes.txt index 30875da..10e8b77 100644 --- a/config/legacy_untested_classes.txt +++ b/config/legacy_untested_classes.txt @@ -16,9 +16,8 @@ org/apache/aurora/scheduler/configuration/executor/ExecutorModule$1 org/apache/aurora/scheduler/cron/quartz/CronSchedulerImpl org/apache/aurora/scheduler/cron/quartz/CronSchedulerImpl$1 org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig -org/apache/aurora/scheduler/discovery/ZooKeeperClientModule$1 -org/apache/aurora/scheduler/discovery/ZooKeeperClientModule$LocalClientProvider -org/apache/aurora/scheduler/discovery/ZooKeeperClientModule$TestServerService +org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule$TestServerService +org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule$LocalZooKeeperClusterProvider org/apache/aurora/scheduler/http/AbortCallback org/apache/aurora/scheduler/http/api/security/Kerberos5Realm org/apache/aurora/scheduler/http/JerseyTemplateServlet http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/examples/vagrant/upstart/aurora-scheduler.conf ---------------------------------------------------------------------- diff --git a/examples/vagrant/upstart/aurora-scheduler.conf b/examples/vagrant/upstart/aurora-scheduler.conf index b9732d2..084016a 100644 --- a/examples/vagrant/upstart/aurora-scheduler.conf +++ b/examples/vagrant/upstart/aurora-scheduler.conf @@ -33,6 +33,7 @@ exec bin/aurora-scheduler \ -hostname=aurora.local \ -http_port=8081 \ -native_log_quorum_size=1 \ + -zk_use_curator \ -zk_endpoints=localhost:2181 \ -mesos_master_address=zk://localhost:2181/mesos \ -serverset_path=/aurora/scheduler \ http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java index 25e1312..9ebfe23 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java +++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java @@ -52,8 +52,6 @@ import org.apache.aurora.scheduler.configuration.executor.ExecutorModule; import org.apache.aurora.scheduler.cron.quartz.CronModule; import org.apache.aurora.scheduler.discovery.FlaggedZooKeeperConfig; import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule; -import org.apache.aurora.scheduler.discovery.ZooKeeperClientModule; -import org.apache.aurora.scheduler.discovery.ZooKeeperConfig; import org.apache.aurora.scheduler.http.HttpService; import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule; import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule; @@ -156,12 +154,10 @@ public class SchedulerMain { LOG.error("Uncaught exception from " + t + ":" + e, e); }); - ZooKeeperConfig zkClientConfig = FlaggedZooKeeperConfig.create(); Module module = Modules.combine( appEnvironmentModule, getUniversalModule(), - new ZooKeeperClientModule(zkClientConfig), - new ServiceDiscoveryModule(SERVERSET_PATH.get(), zkClientConfig.credentials), + new ServiceDiscoveryModule(FlaggedZooKeeperConfig.create(), SERVERSET_PATH.get()), new BackupModule(SnapshotStoreImpl.class), new ExecutorModule(), new AbstractModule() { http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitor.java deleted file mode 100644 index 3336c87..0000000 --- a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitor.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery; - -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; - -import javax.inject.Inject; - -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.scheduler.app.ServiceGroupMonitor; - -import static java.util.Objects.requireNonNull; - -class CommonsServerGroupMonitor implements ServiceGroupMonitor { - private Optional<Command> closeCommand = Optional.empty(); - private final DynamicHostSet<ServiceInstance> serverSet; - private final AtomicReference<ImmutableSet<ServiceInstance>> services = - new AtomicReference<>(ImmutableSet.of()); - - @Inject - CommonsServerGroupMonitor(DynamicHostSet<ServiceInstance> serverSet) { - this.serverSet = requireNonNull(serverSet); - } - - @Override - public void start() throws MonitorException { - try { - closeCommand = Optional.of(serverSet.watch(services::set)); - } catch (DynamicHostSet.MonitorException e) { - throw new MonitorException("Unable to watch scheduler host set.", e); - } - } - - @Override - public void close() { - closeCommand.ifPresent(Command::execute); - } - - @Override - public ImmutableSet<ServiceInstance> get() { - return services.get(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java new file mode 100644 index 0000000..339f63b --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java @@ -0,0 +1,102 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.net.InetSocketAddress; +import java.util.List; + +import javax.inject.Singleton; + +import com.google.inject.Exposed; +import com.google.inject.PrivateModule; +import com.google.inject.Provides; + +import org.apache.aurora.common.net.pool.DynamicHostSet; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.zookeeper.ServerSetImpl; +import org.apache.aurora.common.zookeeper.SingletonService; +import org.apache.aurora.common.zookeeper.SingletonServiceImpl; +import org.apache.aurora.common.zookeeper.ZooKeeperClient; +import org.apache.aurora.common.zookeeper.ZooKeeperUtils; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.apache.zookeeper.data.ACL; + +import static java.util.Objects.requireNonNull; + +/** + * Binding module for utilities to advertise the network presence of the scheduler. + * + * Uses a fork of Twitter commons/zookeeper. + */ +class CommonsServiceDiscoveryModule extends PrivateModule { + + private final String discoveryPath; + private final ZooKeeperConfig zooKeeperConfig; + + CommonsServiceDiscoveryModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) { + this.discoveryPath = ZooKeeperUtils.normalizePath(discoveryPath); + this.zooKeeperConfig = requireNonNull(zooKeeperConfig); + } + + @Override + protected void configure() { + requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY); + requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY); + + bind(ServiceGroupMonitor.class).to(CommonsServiceGroupMonitor.class).in(Singleton.class); + expose(ServiceGroupMonitor.class); + } + + @Provides + @Singleton + ZooKeeperClient provideZooKeeperClient( + @ServiceDiscoveryBindings.ZooKeeper Iterable<InetSocketAddress> zooKeeperCluster) { + + return new ZooKeeperClient( + zooKeeperConfig.getSessionTimeout(), + zooKeeperConfig.getCredentials(), + zooKeeperConfig.getChrootPath(), + zooKeeperCluster); + } + + @Provides + @Singleton + ServerSetImpl provideServerSet( + ZooKeeperClient client, + @ServiceDiscoveryBindings.ZooKeeper List<ACL> zooKeeperAcls) { + + return new ServerSetImpl(client, zooKeeperAcls, discoveryPath); + } + + @Provides + @Singleton + DynamicHostSet<ServiceInstance> provideServerSet(ServerSetImpl serverSet) { + // Used for a type re-binding of the server set. + return serverSet; + } + + // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding. + @Provides + @Singleton + @Exposed + SingletonService provideSingletonService( + ZooKeeperClient client, + ServerSetImpl serverSet, + @ServiceDiscoveryBindings.ZooKeeper List<ACL> zookeeperAcls) { + + return new SingletonServiceImpl( + serverSet, + SingletonServiceImpl.createSingletonCandidate(client, discoveryPath, zookeeperAcls)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java new file mode 100644 index 0000000..9161455 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java @@ -0,0 +1,59 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import javax.inject.Inject; + +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.net.pool.DynamicHostSet; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; + +import static java.util.Objects.requireNonNull; + +class CommonsServiceGroupMonitor implements ServiceGroupMonitor { + private Optional<Command> closeCommand = Optional.empty(); + private final DynamicHostSet<ServiceInstance> serverSet; + private final AtomicReference<ImmutableSet<ServiceInstance>> services = + new AtomicReference<>(ImmutableSet.of()); + + @Inject + CommonsServiceGroupMonitor(DynamicHostSet<ServiceInstance> serverSet) { + this.serverSet = requireNonNull(serverSet); + } + + @Override + public void start() throws MonitorException { + try { + closeCommand = Optional.of(serverSet.watch(services::set)); + } catch (DynamicHostSet.MonitorException e) { + throw new MonitorException("Unable to watch scheduler host set.", e); + } + } + + @Override + public void close() { + closeCommand.ifPresent(Command::execute); + } + + @Override + public ImmutableSet<ServiceInstance> get() { + return services.get(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java new file mode 100644 index 0000000..2656662 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java @@ -0,0 +1,172 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.function.Predicate; + +import javax.inject.Singleton; + +import com.google.common.base.Joiner; +import com.google.common.collect.FluentIterable; +import com.google.inject.Exposed; +import com.google.inject.PrivateModule; +import com.google.inject.Provides; +import com.google.inject.TypeLiteral; + +import org.apache.aurora.common.application.ShutdownRegistry; +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.io.Codec; +import org.apache.aurora.common.net.InetSocketAddressHelper; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.zookeeper.Credentials; +import org.apache.aurora.common.zookeeper.ServerSet; +import org.apache.aurora.common.zookeeper.SingletonService; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.retry.BoundedExponentialBackoffRetry; +import org.apache.curator.utils.PathUtils; +import org.apache.zookeeper.data.ACL; + +import static java.util.Objects.requireNonNull; + +/** + * Binding module for utilities to advertise the network presence of the scheduler. + * + * Uses Apache Curator. + */ +class CuratorServiceDiscoveryModule extends PrivateModule { + + private final String discoveryPath; + private final ZooKeeperConfig zooKeeperConfig; + + CuratorServiceDiscoveryModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) { + this.discoveryPath = PathUtils.validatePath(discoveryPath); + this.zooKeeperConfig = requireNonNull(zooKeeperConfig); + } + + @Override + protected void configure() { + requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY); + requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY); + + bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(ServerSet.JSON_CODEC); + } + + @Provides + @Singleton + CuratorFramework provideCuratorFramework( + ShutdownRegistry shutdownRegistry, + @ServiceDiscoveryBindings.ZooKeeper Iterable<InetSocketAddress> zooKeeperCluster, + ACLProvider aclProvider) { + + String connectString = + FluentIterable.from(zooKeeperCluster) + .transform(InetSocketAddressHelper::toString) + .join(Joiner.on(',')); + + if (zooKeeperConfig.getChrootPath().isPresent()) { + connectString = connectString + zooKeeperConfig.getChrootPath().get(); + } + + // This emulates the default BackoffHelper configuration used by the legacy commons/zookeeper + // stack. BackoffHelper is unbounded, this dies after around 5 minutes using the 10 retries. + // NB: BoundedExponentialBackoffRetry caps max retries at 29 if you send it a larger value. + RetryPolicy retryPolicy = + new BoundedExponentialBackoffRetry( + Amount.of(1, Time.SECONDS).as(Time.MILLISECONDS), + Amount.of(1, Time.MINUTES).as(Time.MILLISECONDS), + 10); + + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() + .dontUseContainerParents() // Container nodes are only available in ZK 3.5+. + .connectString(connectString) + .canBeReadOnly(false) // We must be able to write to perform leader election. + .sessionTimeoutMs(zooKeeperConfig.getSessionTimeout().as(Time.MILLISECONDS)) + .retryPolicy(retryPolicy) + .aclProvider(aclProvider); + + if (zooKeeperConfig.getCredentials().isPresent()) { + Credentials credentials = zooKeeperConfig.getCredentials().get(); + builder.authorization(credentials.scheme(), credentials.authToken()); + } + + CuratorFramework curatorFramework = builder.build(); + + // TODO(John Sirois): It would be nice to use a Service to control the lifecycle here, but other + // services (org.apache.aurora.scheduler.http.JettyServerModule.RedirectMonitor) rely on this + // service being started 1st which is not deterministic as things stand. Find a way to leverage + // the Service system for services with Service dependencies. + curatorFramework.start(); + shutdownRegistry.addAction(curatorFramework::close); + + return curatorFramework; + } + + static class SingleACLProvider implements ACLProvider { + private final List<ACL> acl; + + SingleACLProvider(List<ACL> acl) { + this.acl = MorePreconditions.checkNotBlank(acl); + } + + @Override + public List<ACL> getDefaultAcl() { + return acl; + } + + @Override + public List<ACL> getAclForPath(String path) { + return acl; + } + } + + @Provides + @Singleton + ACLProvider provideACLProvider(@ServiceDiscoveryBindings.ZooKeeper List<ACL> acl) { + return new SingleACLProvider(acl); + } + + // These values are compatible with the Java and Python common/zookeeper service discovery + // protocol. If GUID protection is enabled for Curator, the MEMBER_SELECTOR will need to be + // modified to handle GUID prefixes of MEMBER_TOKEN. + private static final String MEMBER_TOKEN = "member_"; + private static final Predicate<String> MEMBER_SELECTOR = name -> name.startsWith(MEMBER_TOKEN); + + @Provides + @Singleton + @Exposed + ServiceGroupMonitor provideServiceGroupMonitor( + CuratorFramework client, + Codec<ServiceInstance> codec) { + + PathChildrenCache groupCache = + new PathChildrenCache(client, discoveryPath, true /* cacheData */); + return new CuratorServiceGroupMonitor(groupCache, MEMBER_SELECTOR, codec); + } + + @Provides + @Singleton + @Exposed + SingletonService provideSingletonService(CuratorFramework client, Codec<ServiceInstance> codec) { + return new CuratorSingletonService(client, discoveryPath, MEMBER_TOKEN, codec); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java index c3a524f..36ad18c 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java @@ -35,6 +35,11 @@ import org.apache.aurora.common.zookeeper.ZooKeeperUtils; * values. */ public final class FlaggedZooKeeperConfig { + @CmdLine(name = "zk_use_curator", + help = "Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter " + + "commons/zookeeper (the legacy library) is used.") + private static final Arg<Boolean> USE_CURATOR = Arg.create(false); + @CmdLine(name = "zk_in_proc", help = "Launches an embedded zookeeper server for local testing causing -zk_endpoints " + "to be ignored if specified.") @@ -66,6 +71,7 @@ public final class FlaggedZooKeeperConfig { */ public static ZooKeeperConfig create() { return new ZooKeeperConfig( + USE_CURATOR.get(), ZK_ENDPOINTS.get(), Optional.fromNullable(CHROOT_PATH.get()), IN_PROCESS.get(), http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java new file mode 100644 index 0000000..28cdc4b --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java @@ -0,0 +1,59 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.net.InetSocketAddress; +import java.util.List; + +import javax.inject.Qualifier; + +import com.google.inject.Key; +import com.google.inject.TypeLiteral; + +import org.apache.zookeeper.data.ACL; + +/** + * Useful constants for Guice modules that provide or consume service discovery configuration + * bindings. + */ +final class ServiceDiscoveryBindings { + + /** + * Indicates a binding for ZooKeeper configuration data. + */ + @Qualifier + @Retention(RetentionPolicy.RUNTIME) + @Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER}) + @interface ZooKeeper { } + + /** + * A binding key for the ZooKeeper cluster endpoints. + */ + static final Key<Iterable<InetSocketAddress>> ZOO_KEEPER_CLUSTER_KEY = + Key.get(new TypeLiteral<Iterable<InetSocketAddress>>() { }, ZooKeeper.class); + + /** + * A binding key for the ZooKeeper ACL to use when creating nodes. + */ + static final Key<List<ACL>> ZOO_KEEPER_ACL_KEY = + Key.get(new TypeLiteral<List<ACL>>() { }, ZooKeeper.class); + + private ServiceDiscoveryBindings() { + // Utility. + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java index fa605cc..3d228da 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java @@ -13,24 +13,29 @@ */ package org.apache.aurora.scheduler.discovery; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; import java.util.List; import javax.inject.Singleton; -import com.google.common.base.Optional; -import com.google.inject.Exposed; -import com.google.inject.PrivateModule; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Files; +import com.google.common.util.concurrent.AbstractIdleService; +import com.google.inject.AbstractModule; +import com.google.inject.Inject; +import com.google.inject.Module; +import com.google.inject.Provider; import com.google.inject.Provides; +import com.google.inject.binder.LinkedBindingBuilder; -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.zookeeper.Credentials; -import org.apache.aurora.common.zookeeper.ServerSetImpl; -import org.apache.aurora.common.zookeeper.SingletonService; -import org.apache.aurora.common.zookeeper.SingletonServiceImpl; -import org.apache.aurora.common.zookeeper.ZooKeeperClient; +import org.apache.aurora.common.application.ShutdownRegistry; +import org.apache.aurora.common.base.MorePreconditions; import org.apache.aurora.common.zookeeper.ZooKeeperUtils; -import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.apache.aurora.common.zookeeper.testing.ZooKeeperTestServer; +import org.apache.aurora.scheduler.SchedulerServicesModule; import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,32 +43,62 @@ import org.slf4j.LoggerFactory; import static java.util.Objects.requireNonNull; /** - * Binding module for utilities to advertise the network presence of the scheduler. + * Creates a Guice module that binds leader election and leader discovery components. */ -public class ServiceDiscoveryModule extends PrivateModule { - - private static final Logger LOG = LoggerFactory.getLogger(ServiceDiscoveryModule.class); - - private final String serverSetPath; - private final Optional<Credentials> zkCredentials; - - public ServiceDiscoveryModule(String serverSetPath, Optional<Credentials> zkCredentials) { - this.serverSetPath = requireNonNull(serverSetPath); - this.zkCredentials = requireNonNull(zkCredentials); +public class ServiceDiscoveryModule extends AbstractModule { + + private static final Logger LOG = LoggerFactory.getLogger(CommonsServiceDiscoveryModule.class); + + private final ZooKeeperConfig zooKeeperConfig; + private final String discoveryPath; + + /** + * Creates a Guice module that will bind a + * {@link org.apache.aurora.common.zookeeper.SingletonService} for scheduler leader election and a + * {@link org.apache.aurora.scheduler.app.ServiceGroupMonitor} that can be used to find the + * leading scheduler. + * + * @param zooKeeperConfig The ZooKeeper client configuration to use to interact with ZooKeeper. + * @param discoveryPath The ZooKeeper path to use to host leader election and service discovery + * nodes under. + */ + public ServiceDiscoveryModule(ZooKeeperConfig zooKeeperConfig, String discoveryPath) { + this.zooKeeperConfig = requireNonNull(zooKeeperConfig); + this.discoveryPath = MorePreconditions.checkNotBlank(discoveryPath); } @Override protected void configure() { - requireBinding(ZooKeeperClient.class); + LinkedBindingBuilder<Iterable<InetSocketAddress>> clusterBinder = + bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY); + + if (zooKeeperConfig.isInProcess()) { + requireBinding(ShutdownRegistry.class); + File tempDir = Files.createTempDir(); + bind(ZooKeeperTestServer.class).toInstance(new ZooKeeperTestServer(tempDir, tempDir)); + SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(TestServerService.class); + + clusterBinder.toProvider(LocalZooKeeperClusterProvider.class); + } else { + clusterBinder.toInstance(zooKeeperConfig.getServers()); + } - bind(ServiceGroupMonitor.class).to(CommonsServerGroupMonitor.class).in(Singleton.class); - expose(ServiceGroupMonitor.class); + install(discoveryModule()); + } + + private Module discoveryModule() { + if (zooKeeperConfig.isUseCurator()) { + return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig); + } else { + return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig); + } } @Provides @Singleton + @ServiceDiscoveryBindings.ZooKeeper List<ACL> provideAcls() { - if (zkCredentials.isPresent()) { + if (zooKeeperConfig.getCredentials().isPresent()) { return ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL; } else { LOG.warn("Running without ZooKeeper digest credentials. ZooKeeper ACLs are disabled."); @@ -71,30 +106,49 @@ public class ServiceDiscoveryModule extends PrivateModule { } } - @Provides - @Singleton - ServerSetImpl provideServerSet(ZooKeeperClient client, List<ACL> zooKeeperAcls) { - return new ServerSetImpl(client, zooKeeperAcls, serverSetPath); - } + /** + * A service to wrap ZooKeeperTestServer. ZooKeeperTestServer is not a service itself because + * some tests depend on stop/start routines that do not no-op, like startAsync and stopAsync may. + */ + private static class TestServerService extends AbstractIdleService { + private final ZooKeeperTestServer testServer; - @Provides - @Singleton - DynamicHostSet<ServiceInstance> provideServerSet(ServerSetImpl serverSet) { - // Used for a type re-binding of the server set. - return serverSet; + @Inject + TestServerService(ZooKeeperTestServer testServer) { + this.testServer = requireNonNull(testServer); + } + + @Override + protected void startUp() { + // We actually start the test server on-demand rather than with the normal lifecycle. + // This is because a ZooKeeperClient binding is needed before scheduler services are started. + } + + @Override + protected void shutDown() { + testServer.stop(); + } } - // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding. - @Provides - @Singleton - @Exposed - SingletonService provideSingletonService( - ZooKeeperClient client, - ServerSetImpl serverSet, - List<ACL> zookeeperAcls) { - - return new SingletonServiceImpl( - serverSet, - SingletonServiceImpl.createSingletonCandidate(client, serverSetPath, zookeeperAcls)); + private static class LocalZooKeeperClusterProvider + implements Provider<Iterable<InetSocketAddress>> { + + private final ZooKeeperTestServer testServer; + + @Inject + LocalZooKeeperClusterProvider(ZooKeeperTestServer testServer) { + this.testServer = requireNonNull(testServer); + } + + @Override + public Iterable<InetSocketAddress> get() { + try { + testServer.startNetwork(); + } catch (IOException | InterruptedException e) { + throw Throwables.propagate(e); + } + return ImmutableList.of( + InetSocketAddress.createUnresolved("localhost", testServer.getPort())); + } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperClientModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperClientModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperClientModule.java deleted file mode 100644 index c0f2061..0000000 --- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperClientModule.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; - -import com.google.common.base.Optional; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.io.Files; -import com.google.common.util.concurrent.AbstractIdleService; -import com.google.inject.AbstractModule; -import com.google.inject.Inject; -import com.google.inject.Key; -import com.google.inject.PrivateModule; -import com.google.inject.Provider; -import com.google.inject.Singleton; - -import org.apache.aurora.common.application.ShutdownRegistry; -import org.apache.aurora.common.inject.Bindings.KeyFactory; -import org.apache.aurora.common.zookeeper.ZooKeeperClient; -import org.apache.aurora.common.zookeeper.testing.ZooKeeperTestServer; -import org.apache.aurora.scheduler.SchedulerServicesModule; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * A guice binding module that configures and binds a {@link ZooKeeperClient} instance. - */ -public class ZooKeeperClientModule extends AbstractModule { - private final KeyFactory keyFactory; - private final ZooKeeperConfig config; - - /** - * Creates a new ZK client module from the provided configuration. - * - * @param config Configuration parameters for the client. - */ - public ZooKeeperClientModule(ZooKeeperConfig config) { - this(KeyFactory.PLAIN, config); - } - - /** - * Creates a new ZK client module from the provided configuration, using a key factory to - * qualify any bindings. - * - * @param keyFactory Factory to use when creating any exposed bindings. - * @param config Configuration parameters for the client. - */ - public ZooKeeperClientModule(KeyFactory keyFactory, ZooKeeperConfig config) { - this.keyFactory = checkNotNull(keyFactory); - this.config = checkNotNull(config); - } - - @Override - protected void configure() { - Key<ZooKeeperClient> clientKey = keyFactory.create(ZooKeeperClient.class); - if (config.inProcess) { - File tempDir = Files.createTempDir(); - bind(ZooKeeperTestServer.class).toInstance(new ZooKeeperTestServer(tempDir, tempDir)); - - install(new PrivateModule() { - @Override - protected void configure() { - requireBinding(ShutdownRegistry.class); - // Bound privately to give the local provider access to configuration settings. - bind(ZooKeeperConfig.class).toInstance(config); - bind(clientKey).toProvider(LocalClientProvider.class).in(Singleton.class); - expose(clientKey); - } - }); - SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(TestServerService.class); - } else { - bind(clientKey).toInstance(new ZooKeeperClient( - config.sessionTimeout, - config.credentials, - config.chrootPath, - config.servers)); - } - } - - /** - * A service to wrap ZooKeeperTestServer. ZooKeeperTestServer is not a service itself because - * some tests depend on stop/start routines that do not no-op, like startAsync and stopAsync may. - */ - private static class TestServerService extends AbstractIdleService { - private final ZooKeeperTestServer testServer; - - @Inject - TestServerService(ZooKeeperTestServer testServer) { - this.testServer = checkNotNull(testServer); - } - - @Override - protected void startUp() { - // We actually start the test server on-demand rather than with the normal lifecycle. - // This is because a ZooKeeperClient binding is needed before scheduler services are started. - } - - @Override - protected void shutDown() { - testServer.stop(); - } - } - - private static class LocalClientProvider implements Provider<ZooKeeperClient> { - private final ZooKeeperConfig config; - private final ZooKeeperTestServer testServer; - - @Inject - LocalClientProvider(ZooKeeperConfig config, ZooKeeperTestServer testServer) { - this.config = checkNotNull(config); - this.testServer = checkNotNull(testServer); - } - - @Override - public ZooKeeperClient get() { - try { - testServer.startNetwork(); - } catch (IOException | InterruptedException e) { - throw Throwables.propagate(e); - } - return new ZooKeeperClient( - config.sessionTimeout, - config.credentials, - Optional.absent(), // chrootPath - ImmutableList.of(InetSocketAddress.createUnresolved("localhost", testServer.getPort()))); - } - } - -} http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java index 80f4da4..3f32a62 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java @@ -35,11 +35,13 @@ public class ZooKeeperConfig { /** * Creates a new client configuration with defaults for the session timeout and credentials. * + * @param useCurator {@code true} to use Apache Curator; otherwise commons/zookeeper is used. * @param servers ZooKeeper server addresses. * @return A new configuration. */ - public static ZooKeeperConfig create(Iterable<InetSocketAddress> servers) { + public static ZooKeeperConfig create(boolean useCurator, Iterable<InetSocketAddress> servers) { return new ZooKeeperConfig( + useCurator, servers, Optional.absent(), // chrootPath false, @@ -47,11 +49,12 @@ public class ZooKeeperConfig { Optional.absent()); // credentials } - public final Iterable<InetSocketAddress> servers; - public final boolean inProcess; - public final Amount<Integer, Time> sessionTimeout; - public final Optional<String> chrootPath; - public final Optional<Credentials> credentials; + private final boolean useCurator; + private final Iterable<InetSocketAddress> servers; + private final boolean inProcess; + private final Amount<Integer, Time> sessionTimeout; + private final Optional<String> chrootPath; + private final Optional<Credentials> credentials; /** * Creates a new client configuration. @@ -62,13 +65,15 @@ public class ZooKeeperConfig { * @param sessionTimeout Timeout duration for established sessions. * @param credentials ZooKeeper authentication credentials. */ - public ZooKeeperConfig( + ZooKeeperConfig( + boolean useCurator, Iterable<InetSocketAddress> servers, Optional<String> chrootPath, boolean inProcess, Amount<Integer, Time> sessionTimeout, Optional<Credentials> credentials) { + this.useCurator = useCurator; this.servers = MorePreconditions.checkNotBlank(servers); this.chrootPath = requireNonNull(chrootPath); this.inProcess = inProcess; @@ -85,10 +90,35 @@ public class ZooKeeperConfig { */ public ZooKeeperConfig withCredentials(Credentials newCredentials) { return new ZooKeeperConfig( + useCurator, servers, chrootPath, inProcess, sessionTimeout, Optional.of(newCredentials)); } + + boolean isUseCurator() { + return useCurator; + } + + public Iterable<InetSocketAddress> getServers() { + return servers; + } + + boolean isInProcess() { + return inProcess; + } + + public Amount<Integer, Time> getSessionTimeout() { + return sessionTimeout; + } + + Optional<String> getChrootPath() { + return chrootPath; + } + + public Optional<Credentials> getCredentials() { + return credentials; + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java index 2aa31ee..6704a32 100644 --- a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java +++ b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java @@ -145,16 +145,16 @@ public class MesosLogStreamModule extends PrivateModule { } String zkConnectString = Joiner.on(',').join( - Iterables.transform(zkClientConfig.servers, InetSocketAddressHelper::toString)); + Iterables.transform(zkClientConfig.getServers(), InetSocketAddressHelper::toString)); - if (zkClientConfig.credentials.isPresent()) { - Credentials zkCredentials = zkClientConfig.credentials.get(); + if (zkClientConfig.getCredentials().isPresent()) { + Credentials zkCredentials = zkClientConfig.getCredentials().get(); return new Log( QUORUM_SIZE.get(), logPath.getAbsolutePath(), zkConnectString, - zkClientConfig.sessionTimeout.getValue(), - zkClientConfig.sessionTimeout.getUnit().getTimeUnit(), + zkClientConfig.getSessionTimeout().getValue(), + zkClientConfig.getSessionTimeout().getUnit().getTimeUnit(), zkLogGroupPath, zkCredentials.scheme(), zkCredentials.authToken()); @@ -163,8 +163,8 @@ public class MesosLogStreamModule extends PrivateModule { QUORUM_SIZE.get(), logPath.getAbsolutePath(), zkConnectString, - zkClientConfig.sessionTimeout.getValue(), - zkClientConfig.sessionTimeout.getUnit().getTimeUnit(), + zkClientConfig.getSessionTimeout().getValue(), + zkClientConfig.getSessionTimeout().getUnit().getTimeUnit(), zkLogGroupPath); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java index b83815b..c310690 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java +++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java @@ -63,7 +63,6 @@ import org.apache.aurora.scheduler.TierModule; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule; -import org.apache.aurora.scheduler.discovery.ZooKeeperClientModule; import org.apache.aurora.scheduler.discovery.ZooKeeperConfig; import org.apache.aurora.scheduler.log.Log; import org.apache.aurora.scheduler.log.Log.Entry; @@ -193,18 +192,18 @@ public class SchedulerIT extends BaseZooKeeperClientTest { .setStatsUrlPrefix(STATS_URL_PREFIX))); } }; - Credentials credentials = Credentials.digestCredentials("mesos", "mesos"); - ZooKeeperConfig zkClientConfig = ZooKeeperConfig - .create(ImmutableList.of(InetSocketAddress.createUnresolved("localhost", getPort()))) - .withCredentials(credentials); + ZooKeeperConfig zkClientConfig = + ZooKeeperConfig.create( + true, // useCurator + ImmutableList.of(InetSocketAddress.createUnresolved("localhost", getPort()))) + .withCredentials(Credentials.digestCredentials("mesos", "mesos")); SchedulerMain main = SchedulerMain.class.newInstance(); Injector injector = Guice.createInjector( ImmutableList.<Module>builder() .add(SchedulerMain.getUniversalModule()) .add(new TierModule(TaskTestUtil.DEV_TIER_CONFIG)) .add(new LogStorageModule()) - .add(new ZooKeeperClientModule(zkClientConfig)) - .add(new ServiceDiscoveryModule(SERVERSET_PATH, Optional.of(credentials))) + .add(new ServiceDiscoveryModule(zkClientConfig, SERVERSET_PATH)) .add(testModule) .build() ); http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java new file mode 100644 index 0000000..d90192b --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java @@ -0,0 +1,77 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.net.InetSocketAddress; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.inject.AbstractModule; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.testing.TearDownTestCase; +import org.apache.aurora.common.zookeeper.Credentials; +import org.apache.aurora.common.zookeeper.SingletonService; +import org.apache.aurora.common.zookeeper.ZooKeeperUtils; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; + +abstract class AbstractDiscoveryModuleTest extends TearDownTestCase { + + @Test + public void testBindingContract() { + ZooKeeperConfig zooKeeperConfig = + new ZooKeeperConfig( + isCurator(), + ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)), + Optional.of("/chroot"), + false, // inProcess + Amount.of(1, Time.DAYS), + Optional.of(Credentials.digestCredentials("test", "user"))); + + Injector injector = + Guice.createInjector( + new AbstractModule() { + @Override + protected void configure() { + bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY) + .toInstance( + ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42))); + bind(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY) + .toInstance(ZooKeeperUtils.OPEN_ACL_UNSAFE); + + bindExtraRequirements(binder()); + } + }, + createModule("/discovery/path", zooKeeperConfig)); + + assertNotNull(injector.getBinding(SingletonService.class).getProvider().get()); + assertNotNull(injector.getBinding(ServiceGroupMonitor.class).getProvider().get()); + } + + void bindExtraRequirements(Binder binder) { + // Noop. + } + + abstract Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig); + + abstract boolean isCurator(); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java new file mode 100644 index 0000000..7a4c4dd --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java @@ -0,0 +1,29 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import com.google.inject.Module; + +public class CommonsDiscoveryModuleTest extends AbstractDiscoveryModuleTest { + + @Override + Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) { + return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig); + } + + @Override + boolean isCurator() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitorTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitorTest.java deleted file mode 100644 index b584780..0000000 --- a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitorTest.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor; -import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.common.thrift.Endpoint; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.thrift.Status; -import org.apache.aurora.scheduler.app.ServiceGroupMonitor; -import org.easymock.Capture; -import org.junit.Before; -import org.junit.Test; - -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public class CommonsServerGroupMonitorTest extends EasyMockTest { - - private DynamicHostSet<ServiceInstance> serverSet; - private Capture<HostChangeMonitor<ServiceInstance>> hostChangeMonitorCapture; - private Command stopCommand; - - @Before - public void setUp() throws Exception { - serverSet = createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { }); - hostChangeMonitorCapture = createCapture(); - stopCommand = createMock(Command.class); - } - - private void expectSuccessfulWatch() throws Exception { - expect(serverSet.watch(capture(hostChangeMonitorCapture))).andReturn(stopCommand); - } - - private void expectFailedWatch() throws Exception { - DynamicHostSet.MonitorException watchError = - new DynamicHostSet.MonitorException( - "Problem watching service group", - new RuntimeException()); - expect(serverSet.watch(capture(hostChangeMonitorCapture))).andThrow(watchError); - } - - @Test - public void testNominalLifecycle() throws Exception { - expectSuccessfulWatch(); - - stopCommand.execute(); - expectLastCall(); - - control.replay(); - - CommonsServerGroupMonitor groupMonitor = new CommonsServerGroupMonitor(serverSet); - groupMonitor.start(); - groupMonitor.close(); - } - - @Test - public void testExceptionalLifecycle() throws Exception { - expectFailedWatch(); - control.replay(); - - CommonsServerGroupMonitor groupMonitor = new CommonsServerGroupMonitor(serverSet); - try { - groupMonitor.start(); - fail(); - } catch (ServiceGroupMonitor.MonitorException e) { - // expected - } - - // Close on a non-started monitor should be allowed. - groupMonitor.close(); - } - - @Test - public void testNoHosts() throws Exception { - expectSuccessfulWatch(); - control.replay(); - - CommonsServerGroupMonitor groupMonitor = new CommonsServerGroupMonitor(serverSet); - assertEquals(ImmutableSet.of(), groupMonitor.get()); - - groupMonitor.start(); - assertEquals(ImmutableSet.of(), groupMonitor.get()); - - hostChangeMonitorCapture.getValue().onChange(ImmutableSet.of()); - assertEquals(ImmutableSet.of(), groupMonitor.get()); - } - - @Test - public void testHostUpdates() throws Exception { - expectSuccessfulWatch(); - control.replay(); - - CommonsServerGroupMonitor groupMonitor = new CommonsServerGroupMonitor(serverSet); - groupMonitor.start(); - - ImmutableSet<ServiceInstance> twoHosts = - ImmutableSet.of(serviceInstance("one"), serviceInstance("two")); - hostChangeMonitorCapture.getValue().onChange(twoHosts); - assertEquals(twoHosts, groupMonitor.get()); - - ImmutableSet<ServiceInstance> oneHost = ImmutableSet.of(serviceInstance("one")); - hostChangeMonitorCapture.getValue().onChange(oneHost); - assertEquals(oneHost, groupMonitor.get()); - - ImmutableSet<ServiceInstance> anotherHost = ImmutableSet.of(serviceInstance("three")); - hostChangeMonitorCapture.getValue().onChange(anotherHost); - assertEquals(anotherHost, groupMonitor.get()); - - ImmutableSet<ServiceInstance> noHosts = ImmutableSet.of(); - hostChangeMonitorCapture.getValue().onChange(noHosts); - assertEquals(noHosts, groupMonitor.get()); - } - - private ServiceInstance serviceInstance(String hostName) { - return new ServiceInstance(new Endpoint(hostName, 42), ImmutableMap.of(), Status.ALIVE); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java new file mode 100644 index 0000000..42a2224 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java @@ -0,0 +1,137 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.net.pool.DynamicHostSet; +import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor; +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class CommonsServiceGroupMonitorTest extends EasyMockTest { + + private DynamicHostSet<ServiceInstance> serverSet; + private Capture<HostChangeMonitor<ServiceInstance>> hostChangeMonitorCapture; + private Command stopCommand; + + @Before + public void setUp() throws Exception { + serverSet = createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { }); + hostChangeMonitorCapture = createCapture(); + stopCommand = createMock(Command.class); + } + + private void expectSuccessfulWatch() throws Exception { + expect(serverSet.watch(capture(hostChangeMonitorCapture))).andReturn(stopCommand); + } + + private void expectFailedWatch() throws Exception { + DynamicHostSet.MonitorException watchError = + new DynamicHostSet.MonitorException( + "Problem watching service group", + new RuntimeException()); + expect(serverSet.watch(capture(hostChangeMonitorCapture))).andThrow(watchError); + } + + @Test + public void testNominalLifecycle() throws Exception { + expectSuccessfulWatch(); + + stopCommand.execute(); + expectLastCall(); + + control.replay(); + + CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet); + groupMonitor.start(); + groupMonitor.close(); + } + + @Test + public void testExceptionalLifecycle() throws Exception { + expectFailedWatch(); + control.replay(); + + CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet); + try { + groupMonitor.start(); + fail(); + } catch (ServiceGroupMonitor.MonitorException e) { + // expected + } + + // Close on a non-started monitor should be allowed. + groupMonitor.close(); + } + + @Test + public void testNoHosts() throws Exception { + expectSuccessfulWatch(); + control.replay(); + + CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet); + assertEquals(ImmutableSet.of(), groupMonitor.get()); + + groupMonitor.start(); + assertEquals(ImmutableSet.of(), groupMonitor.get()); + + hostChangeMonitorCapture.getValue().onChange(ImmutableSet.of()); + assertEquals(ImmutableSet.of(), groupMonitor.get()); + } + + @Test + public void testHostUpdates() throws Exception { + expectSuccessfulWatch(); + control.replay(); + + CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet); + groupMonitor.start(); + + ImmutableSet<ServiceInstance> twoHosts = + ImmutableSet.of(serviceInstance("one"), serviceInstance("two")); + hostChangeMonitorCapture.getValue().onChange(twoHosts); + assertEquals(twoHosts, groupMonitor.get()); + + ImmutableSet<ServiceInstance> oneHost = ImmutableSet.of(serviceInstance("one")); + hostChangeMonitorCapture.getValue().onChange(oneHost); + assertEquals(oneHost, groupMonitor.get()); + + ImmutableSet<ServiceInstance> anotherHost = ImmutableSet.of(serviceInstance("three")); + hostChangeMonitorCapture.getValue().onChange(anotherHost); + assertEquals(anotherHost, groupMonitor.get()); + + ImmutableSet<ServiceInstance> noHosts = ImmutableSet.of(); + hostChangeMonitorCapture.getValue().onChange(noHosts); + assertEquals(noHosts, groupMonitor.get()); + } + + private ServiceInstance serviceInstance(String hostName) { + return new ServiceInstance(new Endpoint(hostName, 42), ImmutableMap.of(), Status.ALIVE); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java new file mode 100644 index 0000000..f1a02e4 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java @@ -0,0 +1,67 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Module; + +import org.apache.aurora.common.application.ShutdownRegistry; +import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl; +import org.apache.aurora.common.zookeeper.ZooKeeperUtils; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.zookeeper.data.ACL; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class CuratorDiscoveryModuleTest extends AbstractDiscoveryModuleTest { + + @Override + void bindExtraRequirements(Binder binder) { + ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl(); + binder.bind(ShutdownRegistry.class).toInstance(shutdownRegistry); + addTearDown(shutdownRegistry::execute); + } + + @Override + Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) { + return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig); + } + + @Override + boolean isCurator() { + return false; + } + + @Test + public void testSingleACLProvider() { + ImmutableList<ACL> acl = ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL; + ACLProvider provider = new CuratorServiceDiscoveryModule.SingleACLProvider(acl); + + assertEquals(acl, provider.getDefaultAcl()); + assertEquals(acl, provider.getAclForPath("/random/path/1")); + assertEquals(acl, provider.getAclForPath("/random/path/2")); + } + + @Test(expected = NullPointerException.class) + public void testSingleACLProviderNull() { + new CuratorServiceDiscoveryModule.SingleACLProvider(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testSingleACLProviderEmpty() { + new CuratorServiceDiscoveryModule.SingleACLProvider(ImmutableList.of()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/f4a08459/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java index ac781ea..a065505 100644 --- a/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java +++ b/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java @@ -37,6 +37,7 @@ public class ZooKeeperConfigTest { @Test(expected = IllegalArgumentException.class) public void testEmptyServers() { new ZooKeeperConfig( + false, ImmutableList.of(), Optional.absent(), false, @@ -48,32 +49,34 @@ public class ZooKeeperConfigTest { public void testWithCredentials() { ZooKeeperConfig config = new ZooKeeperConfig( + false, SERVERS, Optional.absent(), false, Amount.of(1, Time.HOURS), Optional.absent()); // credentials - assertFalse(config.credentials.isPresent()); + assertFalse(config.getCredentials().isPresent()); Credentials joeCreds = Credentials.digestCredentials("Joe", "Schmoe"); ZooKeeperConfig joeConfig = config.withCredentials(joeCreds); // Should not mutate the original. assertNotSame(config, joeConfig); - assertFalse(config.credentials.isPresent()); + assertFalse(config.getCredentials().isPresent()); - assertTrue(joeConfig.credentials.isPresent()); - assertEquals(joeCreds, joeConfig.credentials.get()); + assertTrue(joeConfig.getCredentials().isPresent()); + assertEquals(joeCreds, joeConfig.getCredentials().get()); } @Test public void testCreateFactory() { - ZooKeeperConfig config = ZooKeeperConfig.create(SERVERS); + ZooKeeperConfig config = ZooKeeperConfig.create(true, SERVERS); - assertEquals(SERVERS, ImmutableList.copyOf(config.servers)); - assertFalse(config.chrootPath.isPresent()); - assertFalse(config.inProcess); - assertEquals(ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT, config.sessionTimeout); - assertFalse(config.credentials.isPresent()); + assertTrue(config.isUseCurator()); + assertEquals(SERVERS, ImmutableList.copyOf(config.getServers())); + assertFalse(config.getChrootPath().isPresent()); + assertFalse(config.isInProcess()); + assertEquals(ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT, config.getSessionTimeout()); + assertFalse(config.getCredentials().isPresent()); } }