FLUME-2631. End to End authentication in Flume (Johny Rufus via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/542b1695 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/542b1695 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/542b1695 Branch: refs/heads/trunk Commit: 542b1695033d330eb00ae81713fdc838b88332b6 Parents: 3d03053 Author: Hari Shreedharan <[email protected]> Authored: Thu Mar 5 23:19:13 2015 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Thu Mar 5 23:19:13 2015 -0800 ---------------------------------------------------------------------- flume-ng-auth/pom.xml | 88 +++++++ .../flume/api/SecureRpcClientFactory.java | 40 ++++ .../apache/flume/api/SecureThriftRpcClient.java | 113 +++++++++ .../flume/auth/FlumeAuthenticationUtil.java | 99 ++++++++ .../apache/flume/auth/FlumeAuthenticator.java | 45 ++++ .../flume/auth/KerberosAuthenticator.java | 233 +++++++++++++++++++ .../apache/flume/auth/PrivilegedExecutor.java | 52 +++++ .../apache/flume/auth/SecurityException.java | 40 ++++ .../apache/flume/auth/SimpleAuthenticator.java | 88 +++++++ .../java/org/apache/flume/auth/UGIExecutor.java | 80 +++++++ .../flume/auth/TestFlumeAuthenticator.java | 128 ++++++++++ flume-ng-core/pom.xml | 5 + .../java/org/apache/flume/sink/ThriftSink.java | 14 +- .../org/apache/flume/source/ThriftSource.java | 67 +++++- flume-ng-dist/pom.xml | 4 + flume-ng-dist/src/main/assembly/bin.xml | 1 + flume-ng-dist/src/main/assembly/src.xml | 1 + .../api/RpcClientConfigurationConstants.java | 2 + .../org/apache/flume/api/ThriftRpcClient.java | 30 ++- flume-ng-sinks/flume-dataset-sink/pom.xml | 7 - .../org/apache/flume/sink/kite/DatasetSink.java | 39 ++-- .../apache/flume/sink/kite/KerberosUtil.java | 187 --------------- .../flume/sink/kite/TestKerberosUtil.java | 121 ---------- .../apache/flume/sink/hdfs/BucketWriter.java | 37 +-- .../apache/flume/sink/hdfs/HDFSEventSink.java | 229 ++---------------- .../flume/sink/hdfs/TestBucketWriter.java | 28 ++- .../flume/sink/hdfs/TestHDFSEventSink.java | 2 +- .../org/apache/flume/sink/hbase/HBaseSink.java | 34 +-- .../sink/hbase/HBaseSinkSecurityManager.java | 134 ----------- pom.xml | 7 + 30 files changed, 1185 insertions(+), 770 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-auth/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-auth/pom.xml b/flume-ng-auth/pom.xml new file mode 100644 index 0000000..292731d --- /dev/null +++ b/flume-ng-auth/pom.xml @@ -0,0 +1,88 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flume-parent</artifactId> + <groupId>org.apache.flume</groupId> + <version>1.6.0-SNAPSHOT</version> + </parent> + + <artifactId>flume-ng-auth</artifactId> + <name>Flume Auth</name> + <description>Flume Authentication</description> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <version>2.3.7</version> + <inherited>true</inherited> + <extensions>true</extensions> + </plugin> + </plugins> + </build> + + <dependencies> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>${hadoop.common.artifact.id}</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minikdc</artifactId> + <version>${hadoop2.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java new file mode 100644 index 0000000..c976458 --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.api; + +import java.util.Properties; + +/** + * Factory class to construct Flume {@link RPCClient} implementations. + */ +public class SecureRpcClientFactory { + + /** + * Return a secure {@linkplain org.apache.flume.api.RpcClient} that uses Thrift for communicating with + * the next hop. + * @param props + * @return - An {@linkplain org.apache.flume.api.RpcClient} which uses thrift configured with the + * given parameters. + */ + public static RpcClient getThriftInstance(Properties props) { + ThriftRpcClient client = new SecureThriftRpcClient(); + client.configure(props); + return client; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java new file mode 100644 index 0000000..7316e1b --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.api; + +import org.apache.flume.FlumeException; +import org.apache.flume.auth.FlumeAuthenticationUtil; +import org.apache.flume.auth.FlumeAuthenticator; +import org.apache.flume.auth.PrivilegedExecutor; +import org.apache.thrift.transport.*; + +import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.Sasl; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class SecureThriftRpcClient extends ThriftRpcClient { + + private static final String CLIENT_PRINCIPAL = "client-principal"; + private static final String CLIENT_KEYTAB = "client-keytab"; + private static final String SERVER_PRINCIPAL = "server-principal"; + + private String serverPrincipal; + private FlumeAuthenticator privilegedExecutor; + + @Override + protected void configure(Properties properties) throws FlumeException { + super.configure(properties); + serverPrincipal = properties.getProperty(SERVER_PRINCIPAL); + if (serverPrincipal == null || serverPrincipal.isEmpty()) { + throw new IllegalArgumentException("Flume in secure mode, but Flume config doesn't " + + "specify a server principal to use for Kerberos auth."); + } + String clientPrincipal = properties.getProperty(CLIENT_PRINCIPAL); + String keytab = properties.getProperty(CLIENT_KEYTAB); + this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(clientPrincipal, keytab); + if(!privilegedExecutor.isAuthenticated()) { + throw new FlumeException("Authentication failed in Kerberos mode for " + + "principal " + clientPrincipal + " keytab " + keytab); + } + } + + @Override + protected TTransport getTransport(TSocket tsocket) throws Exception { + Map<String, String> saslProperties = new HashMap<String, String>(); + saslProperties.put(Sasl.QOP, "auth"); + String[] names; + try { + names = FlumeAuthenticationUtil.splitKerberosName(serverPrincipal); + } catch (IOException e) { + throw new FlumeException( + "Error while trying to resolve Principal name - " + serverPrincipal, e); + } + return new UgiSaslClientTransport( + "GSSAPI", null, names[0], names[1], saslProperties, null, tsocket, privilegedExecutor); + } + + /** + * This transport wraps the Sasl transports to set up the right UGI context for open(). + */ + public static class UgiSaslClientTransport extends TSaslClientTransport { + PrivilegedExecutor privilegedExecutor; + public UgiSaslClientTransport(String mechanism, String authorizationId, + String protocol, String serverName, Map<String, String> props, + CallbackHandler cbh, TTransport transport, PrivilegedExecutor privilegedExecutor) throws IOException { + super(mechanism, authorizationId, protocol, serverName, props, cbh, + transport); + this.privilegedExecutor = privilegedExecutor; + } + + // open the SASL transport with using the current UserGroupInformation + // This is needed to get the current login context stored + @Override + public void open() throws FlumeException { + try { + this.privilegedExecutor.execute( + new PrivilegedExceptionAction<Void>() { + public Void run() throws FlumeException { + try { + UgiSaslClientTransport.super.open(); + } catch (TTransportException e) { + throw new FlumeException("Failed to open SASL transport", e); + } + return null; + } + }); + } catch (InterruptedException e) { + throw new FlumeException( + "Interrupted while opening underlying transport", e); + } catch (Exception e) { + throw new FlumeException("Failed to open SASL transport", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java new file mode 100644 index 0000000..02afc0d --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.auth; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SecurityUtil; + +import javax.security.auth.callback.CallbackHandler; +import java.io.IOException; + +/** + * FlumeAuthentication utility class that provides methods to get an + * Authenticator. If proper credentials are provided KerberosAuthenticator is + * returned which can be used to execute as the authenticated principal , + * or else a SimpleAuthenticator which executes without any authentication + */ +public class FlumeAuthenticationUtil { + + private FlumeAuthenticationUtil() {} + + private static KerberosAuthenticator kerbAuthenticator; + + /** + * If principal and keytab are null, this method returns a SimpleAuthenticator + * which executes without authentication. If valid credentials are + * provided KerberosAuthenitcator is returned which can be used to execute as + * the authenticated principal. Invalid credentials result in + * IllegalArgumentException and Failure to authenticate results in SecurityException + * + * @param principal + * @param keytab + * @return FlumeAuthenticator + * + * @throws org.apache.flume.auth.SecurityException + */ + public synchronized static FlumeAuthenticator getAuthenticator( + String principal, String keytab) throws SecurityException { + + if(principal == null && keytab == null) { + return SimpleAuthenticator.getSimpleAuthenticator(); + } + + Preconditions.checkArgument(principal != null, + "Principal can not be null when keytab is provided"); + Preconditions.checkArgument(keytab != null, + "Keytab can not be null when Principal is provided"); + + if(kerbAuthenticator == null) { + kerbAuthenticator = new KerberosAuthenticator(); + } + kerbAuthenticator.authenticate(principal, keytab); + + return kerbAuthenticator; + } + + /** + * Returns the standard SaslGssCallbackHandler from the hadoop common module + * + * @return CallbackHandler + */ + public static CallbackHandler getSaslGssCallbackHandler() { + return new SaslRpcServer.SaslGssCallbackHandler(); + } + + /** + * Resolves the principal using Hadoop common's SecurityUtil and splits + * the kerberos principal into three parts user name, host and kerberos realm + * + * @param principal + * @return String[] of username, hostname and kerberos realm + * @throws IOException + */ + public static String[] splitKerberosName(String principal) throws IOException { + String resolvedPrinc = SecurityUtil.getServerPrincipal(principal, ""); + return SaslRpcServer.splitKerberosName(resolvedPrinc); + } +} + + + + + + http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java new file mode 100644 index 0000000..dbe241d --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.auth; + +/** + * FlumeAuthenticator extends on a PrivilegedExecutor providing capabilities to + * proxy as a different user + */ +public interface FlumeAuthenticator extends PrivilegedExecutor { + /** + * Returns the current instance if proxyUsername is null or + * returns the proxied Executor if proxyUserName is valid + * @param proxyUserName + * @return PrivilegedExecutor + */ + public PrivilegedExecutor proxyAs(String proxyUserName); + + /** + * Returns true, if the underlying Authenticator was obtained by + * successful kerberos authentication + * @return boolean + */ + public boolean isAuthenticated(); + + /** + * For Authenticators backed by credentials, this method refreshes the + * credentials periodically + */ + public void startCredentialRefresher(); +} http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java new file mode 100644 index 0000000..3244046 --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.auth; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; + +import com.google.common.base.Preconditions; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION; + +/** + * A kerberos authenticator, which authenticates using the supplied principal + * and keytab and executes with authenticated privileges + */ +class KerberosAuthenticator implements FlumeAuthenticator { + + private static final Logger LOG = LoggerFactory + .getLogger(KerberosAuthenticator.class); + + private volatile UserGroupInformation ugi; + private volatile PrivilegedExecutor privilegedExecutor; + private Map<String, PrivilegedExecutor> proxyCache = new HashMap<String, PrivilegedExecutor>(); + + + @Override + public <T> T execute(PrivilegedAction<T> action) { + return privilegedExecutor.execute(action); + } + + @Override + public <T> T execute(PrivilegedExceptionAction<T> action) throws Exception { + return privilegedExecutor.execute(action); + } + + @Override + public synchronized PrivilegedExecutor proxyAs(String proxyUserName) { + if(proxyUserName == null || proxyUserName.isEmpty()) { + return this; + } + if(proxyCache.get(proxyUserName) == null) { + UserGroupInformation proxyUgi; + proxyUgi = UserGroupInformation.createProxyUser(proxyUserName, ugi); + printUGI(proxyUgi); + proxyCache.put(proxyUserName, new UGIExecutor(proxyUgi)); + } + return proxyCache.get(proxyUserName); + } + + @Override + public boolean isAuthenticated() { + return true; + } + + /** + * When valid principal and keytab are provided and if authentication has + * not yet been done for this object, this method authenticates the + * credentials and populates the ugi. In case of null or invalid credentials + * IllegalArgumentException is thrown. In case of failure to authenticate, + * SecurityException is thrown. If authentication has already happened on + * this KerberosAuthenticator object, then this method checks to see if the current + * credentials passed are same as the validated credentials. If not, it throws + * an exception as this authenticator can represent only one Principal. + * + * @param principal + * @param keytab + */ + public synchronized void authenticate(String principal, String keytab) { + // sanity checking + + Preconditions.checkArgument(principal != null && !principal.isEmpty(), + "Invalid Kerberos principal: " + String.valueOf(principal)); + Preconditions.checkArgument(keytab != null && !keytab.isEmpty(), + "Invalid Kerberos keytab: " + String.valueOf(keytab)); + File keytabFile = new File(keytab); + Preconditions.checkArgument(keytabFile.isFile() && keytabFile.canRead(), + "Keytab is not a readable file: " + String.valueOf(keytab)); + + + // resolve the requested principal + String resolvedPrincipal; + try { + // resolves _HOST pattern using standard Hadoop search/replace + // via DNS lookup when 2nd argument is empty + resolvedPrincipal = SecurityUtil.getServerPrincipal(principal, ""); + } catch (IOException e) { + throw new IllegalArgumentException("Host lookup error resolving kerberos principal (" + + principal + "). Exception follows.", e); + } + Preconditions.checkNotNull(resolvedPrincipal, + "Resolved Principal must not be null"); + + + // be cruel and unusual when user tries to login as multiple principals + // this isn't really valid with a reconfigure but this should be rare + // enough to warrant a restart of the agent JVM + // TODO: find a way to interrogate the entire current config state, + // since we don't have to be unnecessarily protective if they switch all + // HDFS sinks to use a different principal all at once. + + Preconditions.checkState(ugi == null || ugi.getUserName().equals(resolvedPrincipal), + "Cannot use multiple kerberos principals in the same agent. " + + " Must restart agent to use new principal or keytab. " + + "Previous = %s, New = %s", ugi, resolvedPrincipal); + + + // enable the kerberos mode of UGI, before doing anything else + if(!UserGroupInformation.isSecurityEnabled()) { + Configuration conf = new Configuration(false); + conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(conf); + } + + // We are interested in currently logged in user with kerberos creds + UserGroupInformation curUser = null; + try { + curUser = UserGroupInformation.getLoginUser(); + if(curUser != null && !curUser.hasKerberosCredentials()) { + curUser = null; + } + } catch (IOException e) { + LOG.warn("User unexpectedly had no active login. Continuing with " + + "authentication", e); + } + + /* + * if ugi is not null, + * if ugi matches currently logged in kerberos user, we are good + * else we are logged out, so relogin our ugi + * else if ugi is null, login and populate state + */ + try { + if (ugi != null) { + if (curUser != null && curUser.getUserName().equals(ugi.getUserName())) { + LOG.debug("Using existing principal login: {}", ugi); + } else { + LOG.info("Attempting kerberos Re-login as principal ({}) " + , new Object[] { ugi.getUserName() } ); + ugi.reloginFromKeytab(); + } + } else { + LOG.info("Attempting kerberos login as principal ({}) from keytab " + + "file ({})", new Object[] { resolvedPrincipal, keytab } ); + UserGroupInformation.loginUserFromKeytab(resolvedPrincipal, keytab); + this.ugi = UserGroupInformation.getLoginUser(); + this.privilegedExecutor = new UGIExecutor(this.ugi); + } + } catch (IOException e) { + throw new SecurityException("Authentication error while attempting to " + + "login as kerberos principal (" + resolvedPrincipal + ") using " + + "keytab (" + keytab + "). Exception follows.", e); + } + + printUGI(this.ugi); + } + + private void printUGI(UserGroupInformation ugi) { + if (ugi != null) { + // dump login information + AuthenticationMethod authMethod = ugi.getAuthenticationMethod(); + LOG.info("\n{} \nUser: {} \nAuth method: {} \nKeytab: {} \n", + new Object[]{ authMethod.equals(AuthenticationMethod.PROXY) ? + "Proxy as: " : "Logged as: ", ugi.getUserName(), authMethod, + ugi.isFromKeytab() } + ); + } + } + + /** + * startCredentialRefresher should be used only for long running + * methods like Thrift source. For all privileged methods that use a UGI, the + * credentials are checked automatically and refreshed before the + * privileged method is executed in the UGIExecutor + */ + @Override + public void startCredentialRefresher() { + int CHECK_TGT_INTERVAL = 120; // seconds + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + scheduler.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + ugi.checkTGTAndReloginFromKeytab(); + } catch (IOException e) { + LOG.warn("Error occured during checkTGTAndReloginFromKeytab() for user " + + ugi.getUserName(), e); + } + } + }, CHECK_TGT_INTERVAL, CHECK_TGT_INTERVAL, TimeUnit.SECONDS); + } + + @VisibleForTesting + String getUserName() { + if(ugi != null) { + return ugi.getUserName(); + } else { + return null; + } + } +} + + + http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java new file mode 100644 index 0000000..0aa321a --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.auth; + +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; + + +/** + * PrivilegedExecutor provides the ability to execute a PrivilegedAction + * or a PrivilegedExceptionAction. Implementors of this class, can chose to execute + * in normal mode or secure authenticated mode + */ +public interface PrivilegedExecutor { + /** + * This method is used to execute a privileged action, the implementor can + * chose to execute the action using the appropriate privileges + * + * @param action A PrivilegedExceptionAction to perform as the desired user + * @param <T> The return type of the action + * @return T the T value returned by action.run() + * @throws Exception + */ + public <T> T execute(PrivilegedExceptionAction<T> action) throws Exception; + + /** + * This method is used to execute a privileged action, the implementor can + * chose to execute the action using the appropriate privileges + * + * @param action A PrivilegedAction to perform as the desired user + * @param <T> The return type of the action + * @return T the T value returned by action.run() + */ + public <T> T execute(PrivilegedAction<T> action); +} + + http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java new file mode 100644 index 0000000..5760481 --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.auth; + +/** + * SecurityException thrown in the Flume security module + */ +public class SecurityException extends RuntimeException { + public SecurityException(String message) { + super(message); + } + + public SecurityException(String message, Throwable cause) { + super(message, cause); + } + + public SecurityException(Throwable cause) { + super(cause); + } +} + + + + + http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java new file mode 100644 index 0000000..f7b5bea --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.auth; + +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.Map; + +/** + * A no-op authenticator, which does not authenticate and executes + * without any authenticated privileges + */ +class SimpleAuthenticator implements FlumeAuthenticator { + private SimpleAuthenticator() {} + + private static class SimpleAuthenticatorHolder { + public static SimpleAuthenticator authenticator = new SimpleAuthenticator(); + } + + public static SimpleAuthenticator getSimpleAuthenticator() { + return SimpleAuthenticatorHolder.authenticator; + } + + private Map<String, PrivilegedExecutor> proxyCache = + new HashMap<String, PrivilegedExecutor>(); + + + @Override + public <T> T execute(PrivilegedExceptionAction<T> action) + throws Exception { + return action.run(); + } + + @Override + public <T> T execute(PrivilegedAction<T> action) { + return action.run(); + } + + @Override + public synchronized PrivilegedExecutor proxyAs(String proxyUserName) { + if(proxyUserName == null || proxyUserName.isEmpty()) { + return this; + } + if(proxyCache.get(proxyUserName) == null) { + UserGroupInformation proxyUgi; + try { + proxyUgi = UserGroupInformation.createProxyUser(proxyUserName, + UserGroupInformation.getCurrentUser()); + } catch (IOException e) { + throw new SecurityException("Unable to create proxy User", e); + } + proxyCache.put(proxyUserName, new UGIExecutor(proxyUgi)); + } + return proxyCache.get(proxyUserName); + } + + @Override + public boolean isAuthenticated() { + return false; + } + + @Override + public void startCredentialRefresher() { + // no-op + } + +} + + http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java new file mode 100644 index 0000000..a5aeef2 --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.auth; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; + +class UGIExecutor implements PrivilegedExecutor { + private UserGroupInformation ugi; + + UGIExecutor(UserGroupInformation ugi) { + this.ugi = ugi; + } + + @Override + public <T> T execute(PrivilegedAction<T> action) { + ensureValidAuth(); + return ugi.doAs(action); + } + + @Override + public <T> T execute(PrivilegedExceptionAction<T> action) throws Exception { + ensureValidAuth(); + try { + return ugi.doAs(action); + } catch (IOException ex) { + throw new SecurityException("Privileged action failed", ex); + } catch (InterruptedException ex) { + Thread.interrupted(); + throw new SecurityException(ex); + } + } + + private void ensureValidAuth() { + reloginUGI(ugi); + if(ugi.getAuthenticationMethod().equals(AuthenticationMethod.PROXY)) { + reloginUGI(ugi.getRealUser()); + } + } + + private void reloginUGI(UserGroupInformation ugi) { + try { + if(ugi.hasKerberosCredentials()) { + ugi.checkTGTAndReloginFromKeytab(); + } + } catch (IOException e) { + throw new SecurityException("Error trying to relogin from keytab for user " + + ugi.getUserName(), e); + } + } + + @VisibleForTesting + String getUserName() { + if(ugi != null) { + return ugi.getUserName(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java new file mode 100644 index 0000000..45ba2b0 --- /dev/null +++ b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.auth; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.minikdc.MiniKdc; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; + +public class TestFlumeAuthenticator { + + private static MiniKdc kdc; + private static File workDir; + private static File flumeKeytab; + private static String flumePrincipal = "flume/localhost"; + private static File aliceKeytab; + private static String alicePrincipal = "alice"; + private static Properties conf; + + @BeforeClass + public static void startMiniKdc() throws Exception { + workDir = new File(System.getProperty("test.dir", "target"), + TestFlumeAuthenticator.class.getSimpleName()); + flumeKeytab = new File(workDir, "flume.keytab"); + aliceKeytab = new File(workDir, "alice.keytab"); + conf = MiniKdc.createConf(); + + kdc = new MiniKdc(conf, workDir); + kdc.start(); + + kdc.createPrincipal(flumeKeytab, flumePrincipal); + flumePrincipal = flumePrincipal + "@" + kdc.getRealm(); + + kdc.createPrincipal(aliceKeytab, alicePrincipal); + alicePrincipal = alicePrincipal + "@" + kdc.getRealm(); + } + + @AfterClass + public static void stopMiniKdc() { + if (kdc != null) { + kdc.stop(); + } + } + + @Test + public void testNullLogin() throws IOException { + String principal = null; + String keytab = null; + + FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator( + principal, keytab); + assertFalse(authenticator.isAuthenticated()); + } + + @Test + public void testFlumeLogin() throws IOException { + String principal = flumePrincipal; + String keytab = flumeKeytab.getAbsolutePath(); + String expResult = principal; + + FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator( + principal, keytab); + assertTrue(authenticator.isAuthenticated()); + + String result = ((KerberosAuthenticator)authenticator).getUserName(); + assertEquals("Initial login failed", expResult, result); + + authenticator = FlumeAuthenticationUtil.getAuthenticator( + principal, keytab); + result = ((KerberosAuthenticator)authenticator).getUserName(); + assertEquals("Re-login failed", expResult, result); + + principal = alicePrincipal; + keytab = aliceKeytab.getAbsolutePath(); + try { + authenticator = FlumeAuthenticationUtil.getAuthenticator( + principal, keytab); + result = ((KerberosAuthenticator)authenticator).getUserName(); + fail("Login should have failed with a new principal: " + result); + } catch (Exception ex) { + assertTrue("Login with a new principal failed, but for an unexpected " + + "reason: " + ex.getMessage(), + ex.getMessage().contains("Cannot use multiple kerberos principals")); + } + } + + @Test + public void testProxyAs() throws IOException { + String username = "alice"; + + String expResult = username; + FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator( + null, null); + String result = ((UGIExecutor)(authenticator.proxyAs(username))).getUserName(); + assertEquals("Proxy as didn't generate the expected username", expResult, result); + + authenticator = FlumeAuthenticationUtil.getAuthenticator( + flumePrincipal, flumeKeytab.getAbsolutePath()); + + String login = ((KerberosAuthenticator)authenticator).getUserName(); + assertEquals("Login succeeded, but the principal doesn't match", + flumePrincipal, login); + + result = ((UGIExecutor)(authenticator.proxyAs(username))).getUserName(); + assertEquals("Proxy as didn't generate the expected username", expResult, result); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-core/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml index 8992414..fe34c03 100644 --- a/flume-ng-core/pom.xml +++ b/flume-ng-core/pom.xml @@ -264,6 +264,11 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-auth</artifactId> + </dependency> + + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java index baa60d0..32021d3 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java @@ -23,6 +23,8 @@ import java.util.Properties; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientConfigurationConstants; import org.apache.flume.api.RpcClientFactory; +import org.apache.flume.api.SecureRpcClientFactory; + /** * <p> * A {@link org.apache.flume.Sink} implementation that can send events to an RPC server (such as @@ -102,12 +104,18 @@ import org.apache.flume.api.RpcClientFactory; public class ThriftSink extends AbstractRpcSink { @Override protected RpcClient initializeRpcClient(Properties props) { - props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, - RpcClientFactory.ClientType.THRIFT.name()); // Only one thread is enough, since only one sink thread processes // transactions at any given time. Each sink owns its own Rpc client. props.setProperty(RpcClientConfigurationConstants .CONFIG_CONNECTION_POOL_SIZE, String.valueOf(1)); - return RpcClientFactory.getInstance(props); + boolean enableKerberos = Boolean.parseBoolean(props.getProperty( + RpcClientConfigurationConstants.KERBEROS_KEY, "false")); + if(enableKerberos) { + return SecureRpcClientFactory.getThriftInstance(props); + } else { + props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, + RpcClientFactory.ClientType.THRIFT.name()); + return RpcClientFactory.getInstance(props); + } } } http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java index 06bb604..1d8bb33 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java @@ -26,6 +26,8 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.FlumeException; +import org.apache.flume.auth.FlumeAuthenticationUtil; +import org.apache.flume.auth.FlumeAuthenticator; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SourceCounter; @@ -45,12 +47,16 @@ import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TSSLTransportFactory; +import org.apache.thrift.transport.TTransportFactory; +import org.apache.thrift.transport.TSaslServerTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLServerSocket; +import javax.security.sasl.Sasl; import java.io.FileInputStream; +import java.io.IOException; import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -60,10 +66,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.security.PrivilegedAction; public class ThriftSource extends AbstractSource implements Configurable, EventDrivenSource { @@ -97,6 +106,10 @@ public class ThriftSource extends AbstractSource implements Configurable, private static final String KEYMANAGER_TYPE = "keymanager-type"; private static final String EXCLUDE_PROTOCOLS = "exclude-protocols"; + private static final String KERBEROS_KEY = "kerberos"; + private static final String AGENT_PRINCIPAL = "agent-principal"; + private static final String AGENT_KEYTAB = "agent-keytab"; + private Integer port; private String bindAddress; private int maxThreads = 0; @@ -110,6 +123,9 @@ public class ThriftSource extends AbstractSource implements Configurable, private String keyManagerType; private final List<String> excludeProtocols = new LinkedList<String>(); private boolean enableSsl = false; + private boolean enableKerberos = false; + private String principal; + private FlumeAuthenticator flumeAuth; @Override public void configure(Context context) { @@ -171,6 +187,18 @@ public class ThriftSource extends AbstractSource implements Configurable, "Thrift source configured with invalid keystore: " + keystore, ex); } } + + principal = context.getString(AGENT_PRINCIPAL); + String keytab = context.getString(AGENT_KEYTAB); + enableKerberos = context.getBoolean(KERBEROS_KEY, false); + this.flumeAuth = FlumeAuthenticationUtil.getAuthenticator(principal, keytab); + if(enableKerberos) { + if(!flumeAuth.isAuthenticated()) { + throw new FlumeException("Authentication failed in Kerberos mode for " + + "principal " + principal + " keytab " + keytab); + } + flumeAuth.startCredentialRefresher(); + } } @Override @@ -195,7 +223,15 @@ public class ThriftSource extends AbstractSource implements Configurable, servingExecutor.submit(new Runnable() { @Override public void run() { - server.serve(); + flumeAuth.execute( + new PrivilegedAction<Object>() { + @Override + public Object run() { + server.serve(); + return null; + } + } + ); } }); @@ -263,7 +299,7 @@ public class ThriftSource extends AbstractSource implements Configurable, } private TServer getTThreadedSelectorServer() { - if(enableSsl) { + if(enableSsl || enableKerberos) { return null; } Class<?> serverClass; @@ -277,6 +313,7 @@ public class ThriftSource extends AbstractSource implements Configurable, TServerTransport serverTransport = new TNonblockingServerSocket( new InetSocketAddress(bindAddress, port)); + ExecutorService sourceService; ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat( "Flume Thrift IPC Thread %d").build(); @@ -328,14 +365,35 @@ public class ThriftSource extends AbstractSource implements Configurable, args.protocolFactory(getProtocolFactory()); //populate the transportFactory - args.inputTransportFactory(new TFastFramedTransport.Factory()); - args.outputTransportFactory(new TFastFramedTransport.Factory()); + if(enableKerberos) { + args.transportFactory(getSASLTransportFactory()); + } else { + args.transportFactory(new TFastFramedTransport.Factory()); + } // populate the Processor args.processor(new ThriftSourceProtocol .Processor<ThriftSourceHandler>(new ThriftSourceHandler())); } + private TTransportFactory getSASLTransportFactory() { + String[] names; + try { + names = FlumeAuthenticationUtil.splitKerberosName(principal); + } catch (IOException e) { + throw new FlumeException( + "Error while trying to resolve Principal name - " + principal, e); + } + Map<String, String> saslProperties = new HashMap<String, String>(); + saslProperties.put(Sasl.QOP, "auth"); + TSaslServerTransport.Factory saslTransportFactory = + new TSaslServerTransport.Factory(); + saslTransportFactory.addServerDefinition( + "GSSAPI", names[0], names[1], saslProperties, + FlumeAuthenticationUtil.getSaslGssCallbackHandler()); + return saslTransportFactory; + } + @Override public void stop() { if(server != null && server.isServing()) { @@ -402,5 +460,4 @@ public class ThriftSource extends AbstractSource implements Configurable, return Status.OK; } } - } http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml index a083fe2..9f7c4f6 100644 --- a/flume-ng-dist/pom.xml +++ b/flume-ng-dist/pom.xml @@ -202,6 +202,10 @@ <groupId>org.apache.flume</groupId> <artifactId>flume-tools</artifactId> </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-auth</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-dist/src/main/assembly/bin.xml ---------------------------------------------------------------------- diff --git a/flume-ng-dist/src/main/assembly/bin.xml b/flume-ng-dist/src/main/assembly/bin.xml index 5aa7cc6..a61180d 100644 --- a/flume-ng-dist/src/main/assembly/bin.xml +++ b/flume-ng-dist/src/main/assembly/bin.xml @@ -68,6 +68,7 @@ <exclude>flume-ng-clients/**</exclude> <exclude>flume-ng-embedded-agent/**</exclude> <exclude>flume-tools/**</exclude> + <exclude>flume-ng-auth/**</exclude> <exclude>**/target/**</exclude> <exclude>**/.classpath</exclude> <exclude>**/.project</exclude> http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-dist/src/main/assembly/src.xml ---------------------------------------------------------------------- diff --git a/flume-ng-dist/src/main/assembly/src.xml b/flume-ng-dist/src/main/assembly/src.xml index b1e79a2..e5f4156 100644 --- a/flume-ng-dist/src/main/assembly/src.xml +++ b/flume-ng-dist/src/main/assembly/src.xml @@ -49,6 +49,7 @@ <include>org.apache.flume:flume-ng-clients</include> <include>org.apache.flume:flume-ng-embedded-agent</include> <include>org.apache.flume:flume-tools</include> + <include>org.apache.flume:flume-ng-auth</include> </includes> <sources> http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java index 33a2330..343e07b 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java @@ -145,6 +145,8 @@ public final class RpcClientConfigurationConstants { public static final String CONFIG_TRUSTSTORE_TYPE = "truststore-type"; public static final String CONFIG_EXCLUDE_PROTOCOLS = "exclude-protocols"; + public static final String KERBEROS_KEY = "kerberos"; + /** * Configuration constants for the NettyAvroRpcClient * NioClientSocketChannelFactory http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java index 4f75a2b..5c4cc41 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java @@ -28,6 +28,7 @@ import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.transport.TFastFramedTransport; import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +74,7 @@ public class ThriftRpcClient extends AbstractRpcClient { public static final String CONFIG_PROTOCOL = "protocol"; public static final String BINARY_PROTOCOL = "binary"; public static final String COMPACT_PROTOCOL = "compact"; - + private int batchSize; private long requestTimeout; private final Lock stateLock; @@ -83,7 +84,6 @@ public class ThriftRpcClient extends AbstractRpcClient { private ConnectionPoolManager connectionManager; private final ExecutorService callTimeoutPool; private final AtomicLong threadCounter; - private int connectionPoolSize; private final Random random = new Random(); private String protocol; @@ -95,7 +95,6 @@ public class ThriftRpcClient extends AbstractRpcClient { private static final String TRUSTMANAGER_TYPE = "trustmanager-type"; private final List<String> excludeProtocols = new LinkedList<String>(); - public ThriftRpcClient() { stateLock = new ReentrantLock(true); connState = State.INIT; @@ -319,7 +318,7 @@ public class ThriftRpcClient extends AbstractRpcClient { requestTimeout = RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS; } - connectionPoolSize = Integer.parseInt(properties.getProperty( + int connectionPoolSize = Integer.parseInt(properties.getProperty( RpcClientConfigurationConstants.CONFIG_CONNECTION_POOL_SIZE, String.valueOf(RpcClientConfigurationConstants .DEFAULT_CONNECTION_POOL_SIZE))); @@ -352,6 +351,7 @@ public class ThriftRpcClient extends AbstractRpcClient { } } } + connectionManager = new ConnectionPoolManager(connectionPoolSize); connState = State.READY; } catch (Throwable ex) { @@ -372,33 +372,41 @@ public class ThriftRpcClient extends AbstractRpcClient { INIT, READY, DEAD } + protected TTransport getTransport(TSocket tsocket) throws Exception { + return new TFastFramedTransport(tsocket); + } + /** * Wrapper around a client and transport, so we can clean up when this * client gets closed. */ private class ClientWrapper { public final ThriftSourceProtocol.Client client; - public final TFastFramedTransport transport; + public final TTransport transport; private final int hashCode; public ClientWrapper() throws Exception{ TSocket tsocket; if(enableSsl) { - // JDK6's factory doesn't appear to pass the protocol onto the Socket properly so we have - // to do some magic to make sure that happens. Not an issue in JDK7 - // Lifted from thrift-0.9.1 to make the SSLContext - SSLContext sslContext = createSSLContext(truststore, truststorePassword, trustManagerType, truststoreType); + // JDK6's factory doesn't appear to pass the protocol onto the Socket + // properly so we have to do some magic to make sure that happens. + // Not an issue in JDK7 Lifted from thrift-0.9.1 to make the SSLContext + SSLContext sslContext = createSSLContext(truststore, truststorePassword, + trustManagerType, truststoreType); // Create the factory from it SSLSocketFactory sslSockFactory = sslContext.getSocketFactory(); // Create the TSocket from that - tsocket = createSSLSocket(sslSockFactory, hostname, port, 120000, excludeProtocols); + tsocket = createSSLSocket( + sslSockFactory, hostname, port, 120000, excludeProtocols); } else { tsocket = new TSocket(hostname, port); } - transport = new TFastFramedTransport(tsocket); + + transport = getTransport(tsocket); + // The transport is already open for SSL as part of TSSLTransportFactory.getClientSocket if(!transport.isOpen()) { transport.open(); http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-sinks/flume-dataset-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/pom.xml b/flume-ng-sinks/flume-dataset-sink/pom.xml index ad3f603..92f7021 100644 --- a/flume-ng-sinks/flume-dataset-sink/pom.xml +++ b/flume-ng-sinks/flume-dataset-sink/pom.xml @@ -150,13 +150,6 @@ limitations under the License. </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minikdc</artifactId> - <version>${hadoop2.version}</version> - <scope>test</scope> - </dependency> - - <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java index fd9f991..a9f42b8 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java @@ -17,6 +17,8 @@ */ package org.apache.flume.sink.kite; +import org.apache.flume.auth.FlumeAuthenticationUtil; +import org.apache.flume.auth.PrivilegedExecutor; import org.apache.flume.sink.kite.parser.EntityParserFactory; import org.apache.flume.sink.kite.parser.EntityParser; import org.apache.flume.sink.kite.policy.FailurePolicy; @@ -25,8 +27,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; + import java.net.URI; -import java.security.PrivilegedExceptionAction; +import java.security.PrivilegedAction; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.avro.Schema; @@ -40,7 +43,6 @@ import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; -import org.apache.hadoop.security.UserGroupInformation; import org.kitesdk.data.Dataset; import org.kitesdk.data.DatasetDescriptor; import org.kitesdk.data.DatasetIOException; @@ -72,7 +74,7 @@ public class DatasetSink extends AbstractSink implements Configurable { private static final Logger LOG = LoggerFactory.getLogger(DatasetSink.class); private Context context = null; - private UserGroupInformation login = null; + private PrivilegedExecutor privilegedExecutor; private String datasetName = null; private URI datasetUri = null; @@ -159,15 +161,12 @@ public class DatasetSink extends AbstractSink implements Configurable { public void configure(Context context) { this.context = context; - // initialize login credentials - this.login = KerberosUtil.login( - context.getString(AUTH_PRINCIPAL), - context.getString(AUTH_KEYTAB)); - String effectiveUser - = context.getString(AUTH_PROXY_USER); - if (effectiveUser != null) { - this.login = KerberosUtil.proxyAs(effectiveUser, login); - } + String principal = context.getString(AUTH_PRINCIPAL); + String keytab = context.getString(AUTH_KEYTAB); + String effectiveUser = context.getString(AUTH_PROXY_USER); + + this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator( + principal, keytab).proxyAs(effectiveUser); // Get the dataset URI and name from the context String datasetURI = context.getString(CONFIG_KITE_DATASET_URI); @@ -395,13 +394,15 @@ public class DatasetSink extends AbstractSink implements Configurable { // reset the commited flag whenver a new writer is created committedBatch = false; try { - View<GenericRecord> view = KerberosUtil.runPrivileged(login, - new PrivilegedExceptionAction<Dataset<GenericRecord>>() { - @Override - public Dataset<GenericRecord> run() { - return Datasets.load(datasetUri); - } - }); + View<GenericRecord> view; + + view = privilegedExecutor.execute( + new PrivilegedAction<Dataset<GenericRecord>>() { + @Override + public Dataset<GenericRecord> run() { + return Datasets.load(datasetUri); + } + }); DatasetDescriptor descriptor = view.getDataset().getDescriptor(); Format format = descriptor.getFormat(); http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java deleted file mode 100644 index c0dbffb..0000000 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flume.sink.kite; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import java.io.File; -import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.kitesdk.data.DatasetException; -import org.kitesdk.data.DatasetIOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KerberosUtil { - - private static final Logger LOG = LoggerFactory.getLogger(KerberosUtil.class); - - public static class SecurityException extends RuntimeException { - private SecurityException(String message) { - super(message); - } - - private SecurityException(String message, Throwable cause) { - super(message, cause); - } - - private SecurityException(Throwable cause) { - super(cause); - } - } - - public static UserGroupInformation proxyAs(String username, - UserGroupInformation login) { - Preconditions.checkArgument(username != null && !username.isEmpty(), - "Invalid username: " + String.valueOf(username)); - Preconditions.checkArgument(login != null, - "Cannot proxy without an authenticated user"); - - // hadoop impersonation works with or without kerberos security - return UserGroupInformation.createProxyUser(username, login); - } - - /** - * Static synchronized method for static Kerberos login. <br/> - * Static synchronized due to a thundering herd problem when multiple Sinks - * attempt to log in using the same principal at the same time with the - * intention of impersonating different users (or even the same user). - * If this is not controlled, MIT Kerberos v5 believes it is seeing a replay - * attach and it returns: - * <blockquote>Request is a replay (34) - PROCESS_TGS</blockquote> - * In addition, since the underlying Hadoop APIs we are using for - * impersonation are static, we define this method as static as well. - * - * @param principal - * Fully-qualified principal to use for authentication. - * @param keytab - * Location of keytab file containing credentials for principal. - * @return Logged-in user - * @throws SecurityException - * if login fails. - * @throws IllegalArgumentException - * if the principal or the keytab is not usable - */ - public static synchronized UserGroupInformation login(String principal, - String keytab) { - // If the principal or keytab isn't set, get the current (Linux) user - if (principal == null || keytab == null) { - try { - return UserGroupInformation.getCurrentUser(); - } catch (IOException ex) { - LOG.error("Can't get current user: {}", ex.getMessage()); - throw new RuntimeException(ex); - } - } - - // resolve the requested principal, if it is present - String finalPrincipal = null; - if (principal != null && !principal.isEmpty()) { - try { - // resolves _HOST pattern using standard Hadoop search/replace - // via DNS lookup when 2nd argument is empty - finalPrincipal = SecurityUtil.getServerPrincipal(principal, ""); - } catch (IOException e) { - throw new SecurityException( - "Failed to resolve Kerberos principal", e); - } - } - - // check if there is a user already logged in - UserGroupInformation currentUser = null; - try { - currentUser = UserGroupInformation.getLoginUser(); - } catch (IOException e) { - // not a big deal but this shouldn't typically happen because it will - // generally fall back to the UNIX user - LOG.debug("Unable to get login user before Kerberos auth attempt", e); - } - - // if the current user is valid (matches the given principal and has a TGT) - // then use it - if (currentUser != null && currentUser.hasKerberosCredentials()) { - if (finalPrincipal == null || - finalPrincipal.equals(currentUser.getUserName())) { - LOG.debug("Using existing login for {}: {}", - finalPrincipal, currentUser); - return currentUser; - } else { - // be cruel and unusual when user tries to login as multiple principals - // this isn't really valid with a reconfigure but this should be rare - // enough to warrant a restart of the agent JVM - // TODO: find a way to interrogate the entire current config state, - // since we don't have to be unnecessarily protective if they switch all - // HDFS sinks to use a different principal all at once. - throw new SecurityException( - "Cannot use multiple Kerberos principals: " + finalPrincipal + - " would replace " + currentUser.getUserName()); - } - } - - // prepare for a new login - Preconditions.checkArgument(principal != null && !principal.isEmpty(), - "Invalid Kerberos principal: " + String.valueOf(principal)); - Preconditions.checkNotNull(finalPrincipal, - "Resolved principal must not be null"); - Preconditions.checkArgument(keytab != null && !keytab.isEmpty(), - "Invalid Kerberos keytab: " + String.valueOf(keytab)); - File keytabFile = new File(keytab); - Preconditions.checkArgument(keytabFile.isFile() && keytabFile.canRead(), - "Keytab is not a readable file: " + String.valueOf(keytab)); - - try { - // attempt static kerberos login - LOG.debug("Logging in as {} with {}", finalPrincipal, keytab); - UserGroupInformation.loginUserFromKeytab(principal, keytab); - return UserGroupInformation.getLoginUser(); - } catch (IOException e) { - throw new SecurityException("Kerberos login failed", e); - } - } - - /** - * Allow methods to act with the privileges of a login. - * - * If the login is null, the current privileges will be used. - * - * @param <T> The return type of the action - * @param login UserGroupInformation credentials to use for action - * @param action A PrivilegedExceptionAction to perform as another user - * @return the T value returned by action.run() - */ - public static <T> T runPrivileged(UserGroupInformation login, - PrivilegedExceptionAction<T> action) { - try { - if (login == null) { - return action.run(); - } else { - return login.doAs(action); - } - } catch (IOException ex) { - throw new DatasetIOException("Privileged action failed", ex); - } catch (InterruptedException ex) { - Thread.interrupted(); - throw new DatasetException(ex); - } catch (Exception ex) { - throw Throwables.propagate(ex); - } - } -} http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java deleted file mode 100644 index f53ef73..0000000 --- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.sink.kite; - -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.util.Properties; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.minikdc.MiniKdc; -import org.apache.hadoop.security.UserGroupInformation; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import static org.junit.Assert.*; - -public class TestKerberosUtil { - - private static MiniKdc kdc; - private static File workDir; - private static File flumeKeytab; - private static String flumePrincipal = "flume/localhost"; - private static File aliceKeytab; - private static String alicePrincipal = "alice"; - private static Properties conf; - - @BeforeClass - public static void startMiniKdc() throws Exception { - URL resource = Thread.currentThread() - .getContextClassLoader().getResource("enable-kerberos.xml"); - Configuration.addDefaultResource("enable-kerberos.xml"); - - workDir = new File(System.getProperty("test.dir", "target"), - TestKerberosUtil.class.getSimpleName()); - flumeKeytab = new File(workDir, "flume.keytab"); - aliceKeytab = new File(workDir, "alice.keytab"); - conf = MiniKdc.createConf(); - - kdc = new MiniKdc(conf, workDir); - kdc.start(); - - kdc.createPrincipal(flumeKeytab, flumePrincipal); - flumePrincipal = flumePrincipal + "@" + kdc.getRealm(); - - kdc.createPrincipal(aliceKeytab, alicePrincipal); - alicePrincipal = alicePrincipal + "@" + kdc.getRealm(); - } - - @AfterClass - public static void stopMiniKdc() { - if (kdc != null) { - kdc.stop(); - } - } - - @Test - public void testNullLogin() throws IOException { - String principal = null; - String keytab = null; - UserGroupInformation expResult = UserGroupInformation.getCurrentUser(); - UserGroupInformation result = KerberosUtil.login(principal, keytab); - assertEquals(expResult, result); - } - - @Test - public void testFlumeLogin() throws IOException { - String principal = flumePrincipal; - String keytab = flumeKeytab.getAbsolutePath(); - String expResult = principal; - - String result = KerberosUtil.login(principal, keytab).getUserName(); - assertEquals("Initial login failed", expResult, result); - - result = KerberosUtil.login(principal, keytab).getUserName(); - assertEquals("Re-login failed", expResult, result); - - principal = alicePrincipal; - keytab = aliceKeytab.getAbsolutePath(); - try { - result = KerberosUtil.login(principal, keytab).getUserName(); - fail("Login should have failed with a new principal: " + result); - } catch (KerberosUtil.SecurityException ex) { - assertTrue("Login with a new principal failed, but for an unexpected " - + "reason: " + ex.getMessage(), - ex.getMessage().contains("Cannot use multiple Kerberos principals: ")); - } - } - - @Test - public void testProxyAs() throws IOException { - String username = "alice"; - - UserGroupInformation login = UserGroupInformation.getCurrentUser(); - String expResult = username; - String result = KerberosUtil.proxyAs(username, login).getUserName(); - assertEquals("Proxy as didn't generate the expected username", expResult, result); - - login = KerberosUtil.login(flumePrincipal, flumeKeytab.getAbsolutePath()); - assertEquals("Login succeeded, but the principal doesn't match", - flumePrincipal, login.getUserName()); - - result = KerberosUtil.proxyAs(username, login).getUserName(); - assertEquals("Proxy as didn't generate the expected username", expResult, result); - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index 62f4eee..6b97de6 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -38,6 +38,7 @@ import org.apache.flume.Clock; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.SystemClock; +import org.apache.flume.auth.PrivilegedExecutor; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback; import org.apache.hadoop.conf.Configuration; @@ -45,7 +46,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +75,7 @@ class BucketWriter { private final CompressionCodec codeC; private final CompressionType compType; private final ScheduledExecutorService timedRollerPool; - private final UserGroupInformation user; + private final PrivilegedExecutor proxyUser; private final AtomicLong fileExtensionCounter; @@ -120,7 +120,7 @@ class BucketWriter { Context context, String filePath, String fileName, String inUsePrefix, String inUseSuffix, String fileSuffix, CompressionCodec codeC, CompressionType compType, HDFSWriter writer, - ScheduledExecutorService timedRollerPool, UserGroupInformation user, + ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser, SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback, String onCloseCallbackPath, long callTimeout, ExecutorService callTimeoutPool, long retryInterval, @@ -138,7 +138,7 @@ class BucketWriter { this.compType = compType; this.writer = writer; this.timedRollerPool = timedRollerPool; - this.user = user; + this.proxyUser = proxyUser; this.sinkCounter = sinkCounter; this.idleTimeout = idleTimeout; this.onCloseCallback = onCloseCallback; @@ -165,33 +165,6 @@ class BucketWriter { this.writer = dataWriter; } - /** - * Allow methods to act as another user (typically used for HDFS Kerberos) - * @param <T> - * @param action - * @return - * @throws IOException - * @throws InterruptedException - */ - private <T> T runPrivileged(final PrivilegedExceptionAction<T> action) - throws IOException, InterruptedException { - - if (user != null) { - return user.doAs(action); - } else { - try { - return action.run(); - } catch (IOException ex) { - throw ex; - } catch (InterruptedException ex) { - throw ex; - } catch (RuntimeException ex) { - throw ex; - } catch (Exception ex) { - throw new RuntimeException("Unexpected exception.", ex); - } - } - } /** * Clear the class counters @@ -700,7 +673,7 @@ class BucketWriter { Future<T> future = callTimeoutPool.submit(new Callable<T>() { @Override public T call() throws Exception { - return runPrivileged(new PrivilegedExceptionAction<T>() { + return proxyUser.execute(new PrivilegedExceptionAction<T>() { @Override public T run() throws Exception { return callRunner.call();
