Repository: flume Updated Branches: refs/heads/trunk 3d0305361 -> 542b16950
http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index 33f73a9..9a48841 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -18,7 +18,6 @@ package org.apache.flume.sink.hdfs; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Calendar; @@ -31,7 +30,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import com.google.common.annotations.VisibleForTesting; import org.apache.flume.Channel; @@ -41,6 +39,9 @@ import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.SystemClock; import org.apache.flume.Transaction; +import org.apache.flume.auth.FlumeAuthenticationUtil; +import org.apache.flume.auth.FlumeAuthenticator; +import org.apache.flume.auth.PrivilegedExecutor; import org.apache.flume.conf.Configurable; import org.apache.flume.formatter.output.BucketPath; import org.apache.flume.instrumentation.SinkCounter; @@ -50,9 +51,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,12 +98,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { private static final int defaultThreadPoolSize = 10; private static final int defaultRollTimerPoolSize = 1; - /** - * Singleton credential manager that manages static credentials for the - * entire JVM - */ - private static final AtomicReference<KerberosUser> staticLogin - = new AtomicReference<KerberosUser>(); private final HDFSWriterFactory writerFactory; private WriterLinkedHashMap sfWriters; @@ -129,11 +121,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { private ExecutorService callTimeoutPool; private ScheduledExecutorService timedRollerPool; - private String kerbConfPrincipal; - private String kerbKeytab; - private String proxyUserName; - private UserGroupInformation proxyTicket; - private boolean needRounding = false; private int roundUnit = Calendar.SECOND; private int roundValue = 1; @@ -150,6 +137,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { private final Object sfWritersLock = new Object(); private long retryInterval; private int tryCount; + private PrivilegedExecutor privExecutor; /* @@ -225,9 +213,9 @@ public class HDFSEventSink extends AbstractSink implements Configurable { defaultThreadPoolSize); rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize", defaultRollTimerPoolSize); - kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal", ""); - kerbKeytab = context.getString("hdfs.kerberosKeytab", ""); - proxyUserName = context.getString("hdfs.proxyUser", ""); + String kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal"); + String kerbKeytab = context.getString("hdfs.kerberosKeytab"); + String proxyUser = context.getString("hdfs.proxyUser"); tryCount = context.getInteger("hdfs.closeTries", defaultTryCount); if(tryCount <= 0) { LOG.warn("Retry count value : " + tryCount + " is not " + @@ -269,9 +257,13 @@ public class HDFSEventSink extends AbstractSink implements Configurable { + " when fileType is: " + fileType); } - if (!authenticate()) { - LOG.error("Failed to authenticate!"); - } + // get the appropriate executor + this.privExecutor = FlumeAuthenticationUtil.getAuthenticator( + kerbConfPrincipal, kerbKeytab).proxyAs(proxyUser); + + + + needRounding = context.getBoolean("hdfs.round", false); if(needRounding) { @@ -482,7 +474,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { rollSize, rollCount, batchSize, context, realPath, realName, inUsePrefix, inUseSuffix, suffix, codeC, compType, hdfsWriter, timedRollerPool, - proxyTicket, sinkCounter, idleTimeout, closeCallback, + privExecutor, sinkCounter, idleTimeout, closeCallback, lookupPath, callTimeout, callTimeoutPool, retryInterval, tryCount); if(mockFs != null) { @@ -551,197 +543,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { super.start(); } - private boolean authenticate() { - - // logic for kerberos login - boolean useSecurity = UserGroupInformation.isSecurityEnabled(); - - LOG.info("Hadoop Security enabled: " + useSecurity); - - if (useSecurity) { - - // sanity checking - if (kerbConfPrincipal.isEmpty()) { - LOG.error("Hadoop running in secure mode, but Flume config doesn't " - + "specify a principal to use for Kerberos auth."); - return false; - } - if (kerbKeytab.isEmpty()) { - LOG.error("Hadoop running in secure mode, but Flume config doesn't " - + "specify a keytab to use for Kerberos auth."); - return false; - } else { - //If keytab is specified, user should want it take effect. - //HDFSEventSink will halt when keytab file is non-exist or unreadable - File kfile = new File(kerbKeytab); - if (!(kfile.isFile() && kfile.canRead())) { - throw new IllegalArgumentException("The keyTab file: " - + kerbKeytab + " is nonexistent or can't read. " - + "Please specify a readable keytab file for Kerberos auth."); - } - } - - String principal; - try { - // resolves _HOST pattern using standard Hadoop search/replace - // via DNS lookup when 2nd argument is empty - principal = SecurityUtil.getServerPrincipal(kerbConfPrincipal, ""); - } catch (IOException e) { - LOG.error("Host lookup error resolving kerberos principal (" - + kerbConfPrincipal + "). Exception follows.", e); - return false; - } - - Preconditions.checkNotNull(principal, "Principal must not be null"); - KerberosUser prevUser = staticLogin.get(); - KerberosUser newUser = new KerberosUser(principal, kerbKeytab); - - // be cruel and unusual when user tries to login as multiple principals - // this isn't really valid with a reconfigure but this should be rare - // enough to warrant a restart of the agent JVM - // TODO: find a way to interrogate the entire current config state, - // since we don't have to be unnecessarily protective if they switch all - // HDFS sinks to use a different principal all at once. - Preconditions.checkState(prevUser == null || prevUser.equals(newUser), - "Cannot use multiple kerberos principals in the same agent. " + - " Must restart agent to use new principal or keytab. " + - "Previous = %s, New = %s", prevUser, newUser); - - // attempt to use cached credential if the user is the same - // this is polite and should avoid flooding the KDC with auth requests - UserGroupInformation curUser = null; - if (prevUser != null && prevUser.equals(newUser)) { - try { - curUser = UserGroupInformation.getLoginUser(); - } catch (IOException e) { - LOG.warn("User unexpectedly had no active login. Continuing with " + - "authentication", e); - } - } - - if (curUser == null || !curUser.getUserName().equals(principal)) { - try { - // static login - kerberosLogin(this, principal, kerbKeytab); - } catch (IOException e) { - LOG.error("Authentication or file read error while attempting to " - + "login as kerberos principal (" + principal + ") using " - + "keytab (" + kerbKeytab + "). Exception follows.", e); - return false; - } - } else { - LOG.debug("{}: Using existing principal login: {}", this, curUser); - } - - // we supposedly got through this unscathed... so store the static user - staticLogin.set(newUser); - } - - // hadoop impersonation works with or without kerberos security - proxyTicket = null; - if (!proxyUserName.isEmpty()) { - try { - proxyTicket = UserGroupInformation.createProxyUser( - proxyUserName, UserGroupInformation.getLoginUser()); - } catch (IOException e) { - LOG.error("Unable to login as proxy user. Exception follows.", e); - return false; - } - } - - UserGroupInformation ugi = null; - if (proxyTicket != null) { - ugi = proxyTicket; - } else if (useSecurity) { - try { - ugi = UserGroupInformation.getLoginUser(); - } catch (IOException e) { - LOG.error("Unexpected error: Unable to get authenticated user after " + - "apparent successful login! Exception follows.", e); - return false; - } - } - - if (ugi != null) { - // dump login information - AuthenticationMethod authMethod = ugi.getAuthenticationMethod(); - LOG.info("Auth method: {}", authMethod); - LOG.info(" User name: {}", ugi.getUserName()); - LOG.info(" Using keytab: {}", ugi.isFromKeytab()); - if (authMethod == AuthenticationMethod.PROXY) { - UserGroupInformation superUser; - try { - superUser = UserGroupInformation.getLoginUser(); - LOG.info(" Superuser auth: {}", superUser.getAuthenticationMethod()); - LOG.info(" Superuser name: {}", superUser.getUserName()); - LOG.info(" Superuser using keytab: {}", superUser.isFromKeytab()); - } catch (IOException e) { - LOG.error("Unexpected error: unknown superuser impersonating proxy.", - e); - return false; - } - } - - LOG.info("Logged in as user {}", ugi.getUserName()); - - return true; - } - - return true; - } - - /** - * Static synchronized method for static Kerberos login. <br/> - * Static synchronized due to a thundering herd problem when multiple Sinks - * attempt to log in using the same principal at the same time with the - * intention of impersonating different users (or even the same user). - * If this is not controlled, MIT Kerberos v5 believes it is seeing a replay - * attach and it returns: - * <blockquote>Request is a replay (34) - PROCESS_TGS</blockquote> - * In addition, since the underlying Hadoop APIs we are using for - * impersonation are static, we define this method as static as well. - * - * @param principal - * Fully-qualified principal to use for authentication. - * @param keytab - * Location of keytab file containing credentials for principal. - * @return Logged-in user - * @throws IOException - * if login fails. - */ - private static synchronized UserGroupInformation kerberosLogin( - HDFSEventSink sink, String principal, String keytab) throws IOException { - - // if we are the 2nd user thru the lock, the login should already be - // available statically if login was successful - UserGroupInformation curUser = null; - try { - curUser = UserGroupInformation.getLoginUser(); - } catch (IOException e) { - // not a big deal but this shouldn't typically happen because it will - // generally fall back to the UNIX user - LOG.debug("Unable to get login user before Kerberos auth attempt.", e); - } - - // we already have logged in successfully - if (curUser != null && curUser.getUserName().equals(principal)) { - LOG.debug("{}: Using existing principal ({}): {}", - new Object[]{sink, principal, curUser}); - - // no principal found - } else { - - LOG.info("{}: Attempting kerberos login as principal ({}) from keytab " + - "file ({})", new Object[]{sink, principal, keytab}); - - // attempt static kerberos login - UserGroupInformation.loginUserFromKeytab(principal, keytab); - curUser = UserGroupInformation.getLoginUser(); - } - - return curUser; - } - @Override public String toString() { return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() + http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java index 7c74b16..2581f73 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java @@ -29,6 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flume.Clock; import org.apache.flume.Context; import org.apache.flume.Event; +import org.apache.flume.auth.FlumeAuthenticationUtil; +import org.apache.flume.auth.PrivilegedExecutor; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SinkCounter; import org.apache.hadoop.conf.Configuration; @@ -53,10 +55,12 @@ public class TestBucketWriter { private Context ctx = new Context(); private static ScheduledExecutorService timedRollerPool; + private static PrivilegedExecutor proxy; @BeforeClass public static void setup() { timedRollerPool = Executors.newSingleThreadScheduledExecutor(); + proxy = FlumeAuthenticationUtil.getAuthenticator(null, null).proxyAs(null); } @AfterClass @@ -72,7 +76,7 @@ public class TestBucketWriter { MockHDFSWriter hdfsWriter = new MockHDFSWriter(); BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx, "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, - hdfsWriter, timedRollerPool, null, + hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -97,7 +101,7 @@ public class TestBucketWriter { BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,timedRollerPool, - null, new SinkCounter("test-bucket-writer-" + + proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -124,7 +128,7 @@ public class TestBucketWriter { MockHDFSWriter hdfsWriter = new MockHDFSWriter(); BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, - hdfsWriter, timedRollerPool, null, + hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, new HDFSEventSink.WriterCallback() { @Override @@ -147,7 +151,7 @@ public class TestBucketWriter { bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, - hdfsWriter, timedRollerPool, null, + hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -230,7 +234,7 @@ public class TestBucketWriter { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, path, name, "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, - timedRollerPool, null, new SinkCounter("test-bucket-writer-" + timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -255,7 +259,7 @@ public class TestBucketWriter { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter, - timedRollerPool, null, new SinkCounter("test-bucket-writer-" + timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -283,7 +287,7 @@ public class TestBucketWriter { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter, - timedRollerPool, null, new SinkCounter( + timedRollerPool, proxy, new SinkCounter( "test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -316,7 +320,7 @@ public class TestBucketWriter { 0, ctx, "/tmp", "file", "", ".tmp", suffix, HDFSEventSink.getCodec("gzip"), SequenceFile.CompressionType.BLOCK, hdfsWriter, - timedRollerPool, null, new SinkCounter("test-bucket-writer-" + timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0 ); @@ -348,7 +352,7 @@ public class TestBucketWriter { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, - timedRollerPool, null, new SinkCounter( + timedRollerPool, proxy, new SinkCounter( "test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -368,7 +372,7 @@ public class TestBucketWriter { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, hdfsWriter, - timedRollerPool, null, new SinkCounter( + timedRollerPool, proxy, new SinkCounter( "test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -388,7 +392,7 @@ public class TestBucketWriter { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, - hdfsWriter, timedRollerPool, null, + hdfsWriter, timedRollerPool, proxy, new SinkCounter( "test-bucket-writer-" + System.currentTimeMillis()), 0, new HDFSEventSink.WriterCallback() { @@ -442,7 +446,7 @@ public class TestBucketWriter { BucketWriter bucketWriter = new BucketWriter(0, 0, 1, 1, ctx, hdfsPath, hdfsPath, "singleBucket", ".tmp", null, null, null, new MockDataStream(mockFs), - timedRollerPool, null, + timedRollerPool, proxy, new SinkCounter( "test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 1, http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index 1b7a364..23862eb 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -276,7 +276,7 @@ public class TestHDFSEventSink { Assert.fail("no exception thrown"); } catch (IllegalArgumentException expected) { Assert.assertTrue(expected.getMessage().contains( - "is nonexistent or can't read.")); + "Keytab is not a readable file")); } finally { //turn security off conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java index 5de0bd5..e659ada 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java @@ -36,6 +36,8 @@ import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.Transaction; import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.auth.FlumeAuthenticationUtil; +import org.apache.flume.auth.PrivilegedExecutor; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; @@ -54,7 +56,6 @@ import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import java.security.PrivilegedExceptionAction; -import org.apache.hadoop.hbase.security.User; /** @@ -103,11 +104,11 @@ public class HBaseSink extends AbstractSink implements Configurable { private Context serializerContext; private String kerberosPrincipal; private String kerberosKeytab; - private User hbaseUser; private boolean enableWal = true; private boolean batchIncrements = false; private Method refGetFamilyMap = null; private SinkCounter sinkCounter; + private PrivilegedExecutor privilegedExecutor; // Internal hooks used for unit testing. private DebugIncrementsCallback debugIncrCallback = null; @@ -132,17 +133,14 @@ public class HBaseSink extends AbstractSink implements Configurable { Preconditions.checkArgument(table == null, "Please call stop " + "before calling start on an old instance."); try { - if (HBaseSinkSecurityManager.isSecurityEnabled(config)) { - hbaseUser = HBaseSinkSecurityManager.login(config, null, - kerberosPrincipal, kerberosKeytab); - } + privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(kerberosPrincipal, kerberosKeytab); } catch (Exception ex) { sinkCounter.incrementConnectionFailedCount(); throw new FlumeException("Failed to login to HBase using " + "provided credentials.", ex); } try { - table = runPrivileged(new PrivilegedExceptionAction<HTable>() { + table = privilegedExecutor.execute(new PrivilegedExceptionAction<HTable>() { @Override public HTable run() throws Exception { HTable table = new HTable(config, tableName); @@ -160,7 +158,7 @@ public class HBaseSink extends AbstractSink implements Configurable { " from HBase", e); } try { - if (!runPrivileged(new PrivilegedExceptionAction<Boolean>() { + if (!privilegedExecutor.execute(new PrivilegedExceptionAction<Boolean>() { @Override public Boolean run() throws IOException { return table.getTableDescriptor().hasFamily(columnFamily); @@ -233,8 +231,8 @@ public class HBaseSink extends AbstractSink implements Configurable { logger.error("Could not instantiate event serializer." , e); Throwables.propagate(e); } - kerberosKeytab = context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB, ""); - kerberosPrincipal = context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL, ""); + kerberosKeytab = context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB); + kerberosPrincipal = context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL); enableWal = context.getBoolean(HBaseSinkConfigurationConstants .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL); @@ -371,7 +369,7 @@ public class HBaseSink extends AbstractSink implements Configurable { private void putEventsAndCommit(final List<Row> actions, final List<Increment> incs, Transaction txn) throws Exception { - runPrivileged(new PrivilegedExceptionAction<Void>() { + privilegedExecutor.execute(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { for (Row r : actions) { @@ -388,7 +386,7 @@ public class HBaseSink extends AbstractSink implements Configurable { } }); - runPrivileged(new PrivilegedExceptionAction<Void>() { + privilegedExecutor.execute(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { @@ -416,18 +414,6 @@ public class HBaseSink extends AbstractSink implements Configurable { sinkCounter.addToEventDrainSuccessCount(actions.size()); } - private <T> T runPrivileged(final PrivilegedExceptionAction<T> action) - throws Exception { - if(hbaseUser != null) { - if (logger.isDebugEnabled()) { - logger.debug("Calling runAs as hbase user: " + hbaseUser.getName()); - } - return hbaseUser.runAs(action); - } else { - return action.run(); - } - } - /** * The method getFamilyMap() is no longer available in Hbase 0.96. * We must use reflection to determine which version we may use. http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java deleted file mode 100644 index 762fce9..0000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.sink.hbase; - -import com.google.common.base.Preconditions; -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import org.apache.flume.FlumeException; -import org.apache.flume.sink.hdfs.KerberosUser; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Class to handle logging into HBase with the credentials passed in. - */ -public class HBaseSinkSecurityManager { - - /* - * volatile for safe publication. Since this is updated only by - * a single thread (configuration) and read later by the sink threads, - * this can just be volatile, no need of Atomic reference. - */ - private volatile static KerberosUser loggedInUser; - private static final String FLUME_KEYTAB_KEY = "flume.keytab.key"; - private static final String FLUME_PRINCIPAL_KEY = "flume.principal.key"; - private static final Logger LOG = - LoggerFactory.getLogger(HBaseSinkSecurityManager.class); - - /** - * Checks if security is enabled for the HBase cluster. - * - * @return - true if security is enabled on the HBase cluster and - * the underlying HDFS cluster. - */ - public static boolean isSecurityEnabled(Configuration conf) { - return User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf); - } - - /** - * Login the user using the configuration, and the hostname specified to use - * for logging in. - * - * @param conf - Configuration to use for logging the user in. - * @param hostname - The hostname to use for logging the user in. If no - * hostname is specified (null or empty string), the canonical hostname for - * the address returned by {@linkplain InetAddress#getLocalHost()} will be - * used. - * @return The logged in HBase {@linkplain User}. - * @throws IOException if login failed, or hostname lookup failed. - */ - public static synchronized User login(Configuration conf, String hostname, - String kerberosPrincipal, String kerberosKeytab) throws IOException { - if (kerberosPrincipal.isEmpty()) { - String msg = "Login failed, since kerberos principal was not specified."; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - if (kerberosKeytab.isEmpty()) { - String msg = "Login failed, since kerberos keytab was not specified."; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } else { - //If keytab is specified, user should want it take effect. - //HDFSEventSink will halt when keytab file is non-exist or unreadable - File kfile = new File(kerberosKeytab); - if (!(kfile.isFile() && kfile.canRead())) { - throw new IllegalArgumentException("The keyTab file: " - + kerberosKeytab + " is nonexistent or can't read. " - + "Please specify a readable keytab file for Kerberos auth."); - } - } - String principal = kerberosPrincipal; - try { - // resolves _HOST pattern using standard Hadoop search/replace - // via DNS lookup when 2nd argument is empty - principal = SecurityUtil.getServerPrincipal(kerberosPrincipal,""); - } catch (IOException e) { - LOG.error("Host lookup error resolving kerberos principal (" - + kerberosPrincipal + "). Exception follows.", e); - throw e; - } - Preconditions.checkNotNull(principal, "Principal must not be null"); - KerberosUser newUser = new KerberosUser(principal, kerberosKeytab); - //The HDFS Sink does not allow login credentials to change. - //To be uniform, we will do the same thing here. - User hbaseUser = null; - boolean loggedIn = false; - if (loggedInUser != null) { - Preconditions.checkArgument(newUser.equals(loggedInUser), - "Cannot switch kerberos credentials during a reconfiguration. " - + "Please restart the agent to set the new credentials."); - try { - hbaseUser = User.create(UserGroupInformation.getLoginUser()); - loggedIn = true; - } catch (IOException ex) { - LOG.warn("Previous login does not exist, " - + "will authenticate against KDC"); - } - } - if (!loggedIn) { - if (hostname == null || hostname.isEmpty()) { - hostname = InetAddress.getLocalHost().getCanonicalHostName(); - } - conf.set(FLUME_KEYTAB_KEY, kerberosKeytab); - conf.set(FLUME_PRINCIPAL_KEY, principal); - User.login(conf, FLUME_KEYTAB_KEY, FLUME_PRINCIPAL_KEY, hostname); - hbaseUser = User.create(UserGroupInformation.getLoginUser()); - loggedInUser = newUser; - //TODO: Set the loggedInUser to the current user. - LOG.info("Logged into HBase as user: " + hbaseUser.getName()); - } - return hbaseUser; - } -} http://git-wip-us.apache.org/repos/asf/flume/blob/542b1695/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3e40558..aad8be6 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,7 @@ limitations under the License. <module>flume-ng-sdk</module> <module>flume-ng-tests</module> <module>flume-tools</module> + <module>flume-ng-auth</module> </modules> <profiles> @@ -1226,6 +1227,12 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-auth</artifactId> + <version>1.6.0-SNAPSHOT</version> + </dependency> + + <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>1.6.0-SNAPSHOT</version>
