Repository: flume Updated Branches: refs/heads/trunk 63d26c19a -> 96b090b51
FLUME-2343. Add Kerberos and user impersonation support to Dataset Sink. (Ryan Blue via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/96b090b5 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/96b090b5 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/96b090b5 Branch: refs/heads/trunk Commit: 96b090b5117c34bba9f5104b47d005fe1c10c775 Parents: 63d26c1 Author: Hari Shreedharan <hshreedha...@apache.org> Authored: Wed Mar 12 11:40:04 2014 -0700 Committer: Hari Shreedharan <hshreedha...@apache.org> Committed: Wed Mar 12 11:41:11 2014 -0700 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 24 +-- .../org/apache/flume/sink/kite/DatasetSink.java | 24 ++- .../flume/sink/kite/DatasetSinkConstants.java | 6 + .../apache/flume/sink/kite/KerberosUtil.java | 176 +++++++++++++++++++ 4 files changed, 218 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/96b090b5/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index cedb283..4bcd8a2 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2047,16 +2047,20 @@ Note 2: In some cases, file rolling may occur slightly after the roll interval has been exceeded. However, this delay will not exceed 5 seconds. In most cases, the delay is neglegible. -===================== ======= =========================================================== -Property Name Default Description -===================== ======= =========================================================== -**channel** -- -**type** -- Must be org.apache.flume.sink.kite.DatasetSink -**kite.repo.uri** -- URI of the repository to open -**kite.dataset.name** -- Name of the Dataset where records will be written -kite.batchSize 100 Number of records to process in each batch -kite.rollInterval 30 Maximum wait time (seconds) before data files are released -===================== ======= =========================================================== +======================= ======= =========================================================== +Property Name Default Description +======================= ======= =========================================================== +**channel** -- +**type** -- Must be org.apache.flume.sink.kite.DatasetSink +**kite.repo.uri** -- URI of the repository to open +**kite.dataset.name** -- Name of the Dataset where records will be written +kite.batchSize 100 Number of records to process in each batch +kite.rollInterval 30 Maximum wait time (seconds) before data files are released +auth.kerberosPrincipal -- Kerberos user principal for secure authentication to HDFS +auth.kerberosKeytab -- Kerberos keytab location (local FS) for the principal +auth.proxyUser -- The effective user for HDFS actions, if different from + the kerberos principal +======================= ======= =========================================================== Custom Sink ~~~~~~~~~~~ http://git-wip-us.apache.org/repos/asf/flume/blob/96b090b5/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java index 1ee0a1f..ed1b8d0 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.net.URL; +import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -47,6 +48,7 @@ import org.apache.flume.sink.AbstractSink; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.kitesdk.data.Dataset; import org.kitesdk.data.DatasetRepositories; import org.kitesdk.data.DatasetWriter; @@ -69,8 +71,10 @@ public class DatasetSink extends AbstractSink implements Configurable { private String repositoryURI = null; private String datasetName = null; private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE; + private Dataset<Object> targetDataset = null; private DatasetWriter<Object> writer = null; + private UserGroupInformation login = null; private SinkCounter counter = null; // for rolling files at a given interval @@ -130,14 +134,30 @@ public class DatasetSink extends AbstractSink implements Configurable { @Override public void configure(Context context) { + // initialize login credentials + this.login = KerberosUtil.login( + context.getString(DatasetSinkConstants.AUTH_PRINCIPAL), + context.getString(DatasetSinkConstants.AUTH_KEYTAB)); + String effectiveUser = + context.getString(DatasetSinkConstants.AUTH_PROXY_USER); + if (effectiveUser != null) { + this.login = KerberosUtil.proxyAs(effectiveUser, login); + } + this.repositoryURI = context.getString( DatasetSinkConstants.CONFIG_KITE_REPO_URI); Preconditions.checkNotNull(repositoryURI, "Repository URI is missing"); this.datasetName = context.getString( DatasetSinkConstants.CONFIG_KITE_DATASET_NAME); Preconditions.checkNotNull(datasetName, "Dataset name is missing"); - this.targetDataset = DatasetRepositories.open(repositoryURI) - .load(datasetName); + + this.targetDataset = KerberosUtil.runPrivileged(login, + new PrivilegedExceptionAction<Dataset<Object>>() { + @Override + public Dataset<Object> run() { + return DatasetRepositories.open(repositoryURI).load(datasetName); + } + }); String formatName = targetDataset.getDescriptor().getFormat().getName(); Preconditions.checkArgument(allowedFormats().contains(formatName), http://git-wip-us.apache.org/repos/asf/flume/blob/96b090b5/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java index 13c776e..09dfab6 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java @@ -48,4 +48,10 @@ public class DatasetSinkConstants { "flume.avro.schema.literal"; public static final String AVRO_SCHEMA_URL_HEADER = "flume.avro.schema.url"; + /** + * Hadoop authentication settings + */ + public static final String AUTH_PROXY_USER = "auth.proxyUser"; + public static final String AUTH_PRINCIPAL = "auth.kerberosPrincipal"; + public static final String AUTH_KEYTAB = "auth.kerberosKeytab"; } http://git-wip-us.apache.org/repos/asf/flume/blob/96b090b5/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java new file mode 100644 index 0000000..92ad141 --- /dev/null +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.kite; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import java.io.File; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.kitesdk.data.DatasetException; +import org.kitesdk.data.DatasetIOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosUtil { + + private static final Logger LOG = LoggerFactory.getLogger(KerberosUtil.class); + + public static class SecurityException extends RuntimeException { + private SecurityException(String message) { + super(message); + } + + private SecurityException(String message, Throwable cause) { + super(message, cause); + } + + private SecurityException(Throwable cause) { + super(cause); + } + } + + public static UserGroupInformation proxyAs(String username, + UserGroupInformation login) { + Preconditions.checkArgument(username != null && !username.isEmpty(), + "Invalid username: " + String.valueOf(username)); + Preconditions.checkArgument(login != null, + "Cannot proxy without an authenticated user"); + + // hadoop impersonation works with or without kerberos security + return UserGroupInformation.createProxyUser(username, login); + } + + /** + * Static synchronized method for static Kerberos login. <br/> + * Static synchronized due to a thundering herd problem when multiple Sinks + * attempt to log in using the same principal at the same time with the + * intention of impersonating different users (or even the same user). + * If this is not controlled, MIT Kerberos v5 believes it is seeing a replay + * attach and it returns: + * <blockquote>Request is a replay (34) - PROCESS_TGS</blockquote> + * In addition, since the underlying Hadoop APIs we are using for + * impersonation are static, we define this method as static as well. + * + * @param principal + * Fully-qualified principal to use for authentication. + * @param keytab + * Location of keytab file containing credentials for principal. + * @return Logged-in user + * @throws SecurityException + * if login fails. + * @throws IllegalArgumentException + * if the principal or the keytab is not usable + */ + public static synchronized UserGroupInformation login(String principal, + String keytab) { + // resolve the requested principal, if it is present + String finalPrincipal = null; + if (principal != null && !principal.isEmpty()) { + try { + // resolves _HOST pattern using standard Hadoop search/replace + // via DNS lookup when 2nd argument is empty + finalPrincipal = SecurityUtil.getServerPrincipal(principal, ""); + } catch (IOException e) { + throw new SecurityException( + "Failed to resolve Kerberos principal", e); + } + } + + // check if there is a user already logged in + UserGroupInformation currentUser = null; + try { + currentUser = UserGroupInformation.getLoginUser(); + } catch (IOException e) { + // not a big deal but this shouldn't typically happen because it will + // generally fall back to the UNIX user + LOG.debug("Unable to get login user before Kerberos auth attempt", e); + } + + // if the current user is valid (matches the given principal) then use it + if (currentUser != null) { + if (finalPrincipal == null || + finalPrincipal.equals(currentUser.getUserName())) { + LOG.debug("Using existing login for {}: {}", + finalPrincipal, currentUser); + return currentUser; + } else { + // be cruel and unusual when user tries to login as multiple principals + // this isn't really valid with a reconfigure but this should be rare + // enough to warrant a restart of the agent JVM + // TODO: find a way to interrogate the entire current config state, + // since we don't have to be unnecessarily protective if they switch all + // HDFS sinks to use a different principal all at once. + throw new SecurityException( + "Cannot use multiple Kerberos principals: " + finalPrincipal + + " would replace " + currentUser.getUserName()); + } + } + + // prepare for a new login + Preconditions.checkArgument(principal != null && !principal.isEmpty(), + "Invalid Kerberos principal: " + String.valueOf(principal)); + Preconditions.checkNotNull(finalPrincipal, + "Resolved principal must not be null"); + Preconditions.checkArgument(keytab != null && !keytab.isEmpty(), + "Invalid Kerberos keytab: " + String.valueOf(keytab)); + File keytabFile = new File(keytab); + Preconditions.checkArgument(keytabFile.isFile() && keytabFile.canRead(), + "Keytab is not a readable file: " + String.valueOf(keytab)); + + try { + // attempt static kerberos login + LOG.debug("Logging in as {} with {}", finalPrincipal, keytab); + UserGroupInformation.loginUserFromKeytab(principal, keytab); + return UserGroupInformation.getLoginUser(); + } catch (IOException e) { + throw new SecurityException("Kerberos login failed", e); + } + } + + /** + * Allow methods to act with the privileges of a login. + * + * If the login is null, the current privileges will be used. + * + * @param <T> The return type of the action + * @param login UserGroupInformation credentials to use for action + * @param action A PrivilegedExceptionAction to perform as another user + * @return the T value returned by action.run() + */ + public static <T> T runPrivileged(UserGroupInformation login, + PrivilegedExceptionAction<T> action) { + try { + if (login == null) { + return action.run(); + } else { + return login.doAs(action); + } + } catch (IOException ex) { + throw new DatasetIOException("Privileged action failed", ex); + } catch (InterruptedException ex) { + Thread.interrupted(); + throw new DatasetException(ex); + } catch (Exception ex) { + throw Throwables.propagate(ex); + } + } +}