[
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15503259#comment-15503259
]
ASF GitHub Bot commented on FLINK-3929:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2275#discussion_r79355820
--- Diff:
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
---
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.apache.flink.test.util.TestingSecurityContext;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+
+/**
+ * Tests for running {@link RollingSinkSecuredITCase} which is an
extension of {@link RollingSink} in secure environment
+ */
+
+//The test is disabled since MiniDFS secure run requires lower order ports
to be used.
+//We can enable the test when the fix is available (HDFS-9213)
+@Ignore
+public class RollingSinkSecuredITCase extends RollingSinkITCase {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(RollingSinkSecuredITCase.class);
+
+ /*
+ * override super class static methods to avoid creating MiniDFS and
MiniFlink with wrong configurations
+ * and out-of-order sequence for secure cluster
+ */
+ @BeforeClass
+ public static void setup() throws Exception {}
+
+ @AfterClass
+ public static void teardown() throws Exception {}
+
+ @BeforeClass
+ public static void createHDFS() throws IOException {}
+
+ @AfterClass
+ public static void destroyHDFS() {}
+
+ @BeforeClass
+ public static void startSecureCluster() throws Exception {
+
+ LOG.info("starting secure cluster environment for testing");
+
+ dataDir = tempFolder.newFolder();
+
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
dataDir.getAbsolutePath());
+
+ SecureTestEnvironment.prepare(tempFolder);
+
+ populateSecureConfigurations();
+
+ Configuration flinkConfig = new Configuration();
+ flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+ SecureTestEnvironment.getTestKeytab());
+ flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+
SecureTestEnvironment.getHadoopServicePrincipal());
+
+ SecurityContext.SecurityConfiguration ctx = new
SecurityContext.SecurityConfiguration();
+ ctx.setFlinkConfiguration(flinkConfig);
+ ctx.setHadoopConfiguration(conf);
+ try {
+ TestingSecurityContext.install(ctx,
SecureTestEnvironment.getClientSecurityConfigurationMap());
+ } catch (Exception e) {
+ throw new RuntimeException("Exception occurred while
setting up secure test context. Reason: {}", e);
+ }
+
+ File hdfsSiteXML = new File(dataDir.getAbsolutePath() +
"/hdfs-site.xml");
+
+ FileWriter writer = new FileWriter(hdfsSiteXML);
+ conf.writeXml(writer);
+ writer.flush();
+ writer.close();
+
+ Map<String, String> map = new HashMap<String,
String>(System.getenv());
+ map.put("HADOOP_CONF_DIR",
hdfsSiteXML.getParentFile().getAbsolutePath());
+ TestBaseUtils.setEnv(map);
+
+
+ MiniDFSCluster.Builder builder = new
MiniDFSCluster.Builder(conf);
+ builder.checkDataNodeAddrConfig(true);
+ builder.checkDataNodeHostConfig(true);
+ hdfsCluster = builder.build();
+
+ dfs = hdfsCluster.getFileSystem();
+
+ hdfsURI = "hdfs://"
+ +
NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(),
hdfsCluster.getNameNodePort())
+ + "/";
+
+ startSecureFlinkClusterWithRecoveryModeEnabled();
+ }
+
+ @AfterClass
+ public static void teardownSecureCluster() throws Exception {
+ LOG.info("tearing down secure cluster environment");
+
+ TestStreamEnvironment.unsetAsContext();
+ stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+
+ hdfsCluster.shutdown();
+
+ SecureTestEnvironment.cleanup();
+ }
+
+ private static void populateSecureConfigurations() {
+
+ String dataTransferProtection = "authentication";
+
+
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS,
conf);
+ conf.set(DFS_NAMENODE_USER_NAME_KEY,
SecureTestEnvironment.getHadoopServicePrincipal());
+ conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY,
SecureTestEnvironment.getTestKeytab());
+ conf.set(DFS_DATANODE_USER_NAME_KEY,
SecureTestEnvironment.getHadoopServicePrincipal());
+ conf.set(DFS_DATANODE_KEYTAB_FILE_KEY,
SecureTestEnvironment.getTestKeytab());
+ conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
SecureTestEnvironment.getHadoopServicePrincipal());
+
+ conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+
+ conf.set("dfs.data.transfer.protection",
dataTransferProtection);
+
+ conf.set(DFS_HTTP_POLICY_KEY,
HttpConfig.Policy.HTTP_ONLY.name());
+
+ conf.set(DFS_ENCRYPT_DATA_TRANSFER_KEY, "false");
+
+ conf.setInt("dfs.datanode.socket.write.timeout", 0);
+
+ /*
+ * We ae setting the port number to privileged port - see
HDFS-9213
+ * This requires the user to have root privilege to bind to the
port
+ * Use below command (ubuntu) to set privilege to java process
for the
+ * bind() to work if the java process is not running as root.
+ * setcap 'cap_net_bind_service=+ep' /path/to/java
+ */
+ conf.set(DFS_DATANODE_ADDRESS_KEY, "localhost:1002");
+ conf.set(DFS_DATANODE_HOST_NAME_KEY, "localhost");
+ conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "localhost:1003");
+ }
+
+ private static void startSecureFlinkClusterWithRecoveryModeEnabled() {
+ try {
+ LOG.info("Starting Flink and ZK in secure mode");
+
+ dfs.mkdirs(new Path("/flink/checkpoints"));
+ dfs.mkdirs(new Path("/flink/recovery"));
+
+ org.apache.flink.configuration.Configuration config =
new org.apache.flink.configuration.Configuration();
+
+
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
DEFAULT_PARALLELISM);
+
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
+
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
+ config.setString(ConfigConstants.RECOVERY_MODE,
"zookeeper");
+ config.setString(ConfigConstants.STATE_BACKEND,
"filesystem");
+
config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI +
"/flink/checkpoints");
+
config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, hdfsURI +
"/flink/recovery");
+ config.setString("state.backend.fs.checkpointdir",
hdfsURI + "/flink/checkpoints");
+
+
SecureTestEnvironment.populateFlinkSecureConfigurations(config);
+
+ cluster = TestBaseUtils.startCluster(config, false);
+ TestStreamEnvironment.setAsContext(cluster,
DEFAULT_PARALLELISM);
+
+ } catch (Exception e) {
+ LOG.error("Exception occured while creating MiniFlink
cluster. Reason: {}", e);
--- End diff --
The signature is error(String msg, Throwable t). You can remove the
"Reason: {}".
> Support for Kerberos Authentication with Keytab Credential
> ----------------------------------------------------------
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
> Issue Type: New Feature
> Reporter: Eron Wright
> Assignee: Vijay Srinivasaraghavan
> Labels: kerberos, security
> Original Estimate: 672h
> Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
> design doc._
> Add support for a keytab credential to be associated with the Flink cluster,
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)