[FLINK-3932] Added ZK ACL configuration for secure cluster setup This closes #2589.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5bd47012 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5bd47012 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5bd47012 Branch: refs/heads/master Commit: 5bd47012e50516d45a6b50d47f347e6802dfde4c Parents: 85b5344 Author: Vijay Srinivasaraghavan <vijayaraghavan.srinivasaragha...@emc.com> Authored: Thu Sep 22 10:10:01 2016 -0700 Committer: Maximilian Michels <m...@apache.org> Committed: Fri Oct 14 16:36:30 2016 +0200 ---------------------------------------------------------------------- docs/setup/config.md | 2 + .../flink/configuration/ConfigConstants.java | 7 ++ flink-dist/src/main/resources/flink-conf.yaml | 6 ++ .../flink/runtime/util/ZooKeeperUtils.java | 70 ++++++++++++++++++++ 4 files changed, 85 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5bd47012/docs/setup/config.md ---------------------------------------------------------------------- diff --git a/docs/setup/config.md b/docs/setup/config.md index 3f6b705..0c8f451 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -332,6 +332,8 @@ Previously this key was named `recovery.mode` and the default value was `standal - `high-availability.job.delay`: (Default `akka.ask.timeout`) Defines the delay before persisted jobs are recovered in case of a master recovery situation. Previously this key was named `recovery.job.delay`. +- `high-availability.zookeeper.client.acl`: (Default `open`) Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be set to "creator" if the ZooKeeper server configuration has the "authProvider" property mapped to use SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos). The ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes + ### ZooKeeper-Security - `zookeeper.sasl.disable`: (Default: `true`) Defines if SASL based authentication needs to be enabled or disabled. The configuration value can be set to "true" if ZooKeeper cluster is running in secure mode (Kerberos) http://git-wip-us.apache.org/repos/asf/flink/blob/5bd47012/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index a64b631..e608eb3 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -759,6 +759,9 @@ public final class ConfigConstants { public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS = "high-availability.zookeeper.client.max-retry-attempts"; @PublicEvolving + public static final String HA_ZOOKEEPER_CLIENT_ACL = "high-availability.zookeeper.client.acl"; + + @PublicEvolving public static final String ZOOKEEPER_SASL_DISABLE = "zookeeper.sasl.disable"; @PublicEvolving @@ -1268,6 +1271,10 @@ public final class ConfigConstants { /** Defaults for ZK client security **/ public static final boolean DEFAULT_ZOOKEEPER_SASL_DISABLE = true; + /** ACL options supported "creator" or "open" */ + public static final String DEFAULT_HA_ZOOKEEPER_CLIENT_ACL = "open"; + + // ------------------------- Queryable state ------------------------------ /** Port to bind KvState server to. */ http://git-wip-us.apache.org/repos/asf/flink/blob/5bd47012/flink-dist/src/main/resources/flink-conf.yaml ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml index c876922..ad916e8 100644 --- a/flink-dist/src/main/resources/flink-conf.yaml +++ b/flink-dist/src/main/resources/flink-conf.yaml @@ -144,6 +144,12 @@ jobmanager.web.port: 8081 # high-availability.zookeeper.quorum: localhost:2181 # high-availability.zookeeper.storageDir: hdfs:///flink/ha/ +# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes +# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) +# The default value is "open" and it can be changed to "creator" if ZK security is enabled +# +# high-availability.zookeeper.client.acl: open + #============================================================================== # Flink Cluster Security Configuration (optional configuration) #============================================================================== http://git-wip-us.apache.org/repos/asf/flink/blob/5bd47012/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index 67fc397..7862f87 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.util; import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.imps.DefaultACLProvider; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; @@ -39,11 +41,14 @@ import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper; import org.apache.flink.util.ConfigurationUtil; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; +import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -79,6 +84,29 @@ public class ZooKeeperUtils { String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID); + boolean disableSaslClient = configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, + ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE); + + ACLProvider aclProvider; + + ZkClientACLMode aclMode = ZkClientACLMode.fromConfig(configuration); + + if(disableSaslClient && aclMode == ZkClientACLMode.CREATOR) { + String errorMessage = "Cannot set ACL role to " + aclMode +" since SASL authentication is " + + "disabled through the " + ConfigConstants.ZOOKEEPER_SASL_DISABLE + " property"; + LOG.warn(errorMessage); + throw new IllegalConfigurationException(errorMessage); + } + + if(aclMode == ZkClientACLMode.CREATOR) { + LOG.info("Enforcing creator for ZK connections"); + aclProvider = new SecureAclProvider(); + } else { + LOG.info("Enforcing default ACL for ZK connections"); + aclProvider = new DefaultACLProvider(); + } + + String rootWithNamespace = generateZookeeperPath(root, namespace); LOG.info("Using '{}' as Zookeeper namespace.", rootWithNamespace); @@ -91,6 +119,7 @@ public class ZooKeeperUtils { // Curator prepends a '/' manually and throws an Exception if the // namespace starts with a '/'. .namespace(rootWithNamespace.startsWith("/") ? rootWithNamespace.substring(1) : rootWithNamespace) + .aclProvider(aclProvider) .build(); cf.start(); @@ -306,6 +335,47 @@ public class ZooKeeperUtils { return root + namespace; } + + public static class SecureAclProvider implements ACLProvider + { + @Override + public List<ACL> getDefaultAcl() + { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + + @Override + public List<ACL> getAclForPath(String path) + { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + } + + public enum ZkClientACLMode { + CREATOR, + OPEN; + + /** + * Return the configured {@link ZkClientACLMode}. + * + * @param config The config to parse + * @return Configured ACL mode or {@link ConfigConstants#DEFAULT_HA_ZOOKEEPER_CLIENT_ACL} if not + * configured. + */ + public static ZkClientACLMode fromConfig(Configuration config) { + String aclMode = config.getString(ConfigConstants.HA_ZOOKEEPER_CLIENT_ACL, null); + if (aclMode == null || aclMode.equalsIgnoreCase(ZkClientACLMode.OPEN.name())) { + return ZkClientACLMode.OPEN; + } else if (aclMode.equalsIgnoreCase(ZkClientACLMode.CREATOR.name())) { + return ZkClientACLMode.CREATOR; + } else { + String message = "Unsupported ACL option: [" + aclMode + "] provided"; + LOG.error(message); + throw new IllegalConfigurationException(message); + } + } + } + /** * Private constructor to prevent instantiation. */