Merge branch '1.6' into 1.7 Conflicts: proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java server/master/src/main/java/org/apache/accumulo/master/Master.java server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/022225b8 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/022225b8 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/022225b8 Branch: refs/heads/1.7 Commit: 022225b853f4dafc4361aefbff9897f21e88e235 Parents: 1853d08 44b17c6 Author: Josh Elser <els...@apache.org> Authored: Tue Dec 1 16:44:01 2015 -0500 Committer: Josh Elser <els...@apache.org> Committed: Tue Dec 1 16:44:01 2015 -0500 ---------------------------------------------------------------------- .../java/org/apache/accumulo/proxy/Proxy.java | 3 +- .../apache/accumulo/server/rpc/RpcWrapper.java | 79 +++++- .../accumulo/server/rpc/RpcWrapperTest.java | 266 +++++++++++++++++++ .../accumulo/gc/SimpleGarbageCollector.java | 2 +- .../java/org/apache/accumulo/master/Master.java | 5 +- .../apache/accumulo/tserver/TabletServer.java | 5 +- 6 files changed, 346 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/022225b8/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java ---------------------------------------------------------------------- diff --cc proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java index 1ce02e8,3368d20..5fa64e4 --- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java +++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java @@@ -23,26 -23,13 +23,27 @@@ import java.io.InputStream import java.util.Properties; import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.rpc.SslConnectionParams; import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.accumulo.proxy.thrift.AccumuloProxy; -import org.apache.accumulo.server.util.RpcWrapper; -import org.apache.log4j.Logger; +import org.apache.accumulo.server.metrics.MetricsFactory; +import org.apache.accumulo.server.rpc.RpcWrapper; +import org.apache.accumulo.server.rpc.SaslServerConnectionParams; +import org.apache.accumulo.server.rpc.ServerAddress; +import org.apache.accumulo.server.rpc.TServerUtils; +import org.apache.accumulo.server.rpc.ThriftServerType; +import org.apache.accumulo.server.rpc.TimedProcessor; +import org.apache.accumulo.server.rpc.UGIAssumingProcessor; +import org.apache.accumulo.start.spi.KeywordExecutable; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.security.UserGroupInformation; + import org.apache.thrift.TBaseProcessor; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocolFactory; @@@ -161,113 -118,35 +162,113 @@@ public class Proxy implements KeywordEx Class<? extends TProtocolFactory> protoFactoryClass = Class.forName(opts.prop.getProperty("protocolFactory", TCompactProtocol.Factory.class.getName())) .asSubclass(TProtocolFactory.class); + TProtocolFactory protoFactory = protoFactoryClass.newInstance(); int port = Integer.parseInt(opts.prop.getProperty("port")); - TServer server = createProxyServer(AccumuloProxy.class, ProxyServer.class, port, protoFactoryClass, opts.prop); - server.serve(); + String hostname = opts.prop.getProperty(THRIFT_SERVER_HOSTNAME, THRIFT_SERVER_HOSTNAME_DEFAULT); + HostAndPort address = HostAndPort.fromParts(hostname, port); + ServerAddress server = createProxyServer(address, protoFactory, opts.prop); + // Wait for the server to come up + while (!server.server.isServing()) { + Thread.sleep(100); + } + log.info("Proxy server started on " + server.getAddress()); + while (server.server.isServing()) { + Thread.sleep(1000); + } + } + + public static void main(String[] args) throws Exception { + new Proxy().execute(args); + } + + public static ServerAddress createProxyServer(HostAndPort address, TProtocolFactory protocolFactory, Properties properties) throws Exception { + return createProxyServer(address, protocolFactory, properties, ClientConfiguration.loadDefault()); } - public static TServer createProxyServer(Class<?> api, Class<?> implementor, final int port, Class<? extends TProtocolFactory> protoClass, - Properties properties) throws Exception { - final TNonblockingServerSocket socket = new TNonblockingServerSocket(port); + public static ServerAddress createProxyServer(HostAndPort address, TProtocolFactory protocolFactory, Properties properties, ClientConfiguration clientConf) + throws Exception { + final int numThreads = Integer.parseInt(properties.getProperty(THRIFT_THREAD_POOL_SIZE_KEY, THRIFT_THREAD_POOL_SIZE_DEFAULT)); + final long maxFrameSize = AccumuloConfiguration.getMemoryInBytes(properties.getProperty(THRIFT_MAX_FRAME_SIZE_KEY, THRIFT_MAX_FRAME_SIZE_DEFAULT)); + final int simpleTimerThreadpoolSize = Integer.parseInt(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE.getDefaultValue()); + // How frequently to try to resize the thread pool + final long threadpoolResizeInterval = 1000l * 5; + // No timeout + final long serverSocketTimeout = 0l; + // Use the new hadoop metrics2 support + final MetricsFactory metricsFactory = new MetricsFactory(false); + final String serverName = "Proxy", threadName = "Accumulo Thrift Proxy"; - // create the implementor - Object impl = implementor.getConstructor(Properties.class).newInstance(properties); + // create the implementation of the proxy interface + ProxyServer impl = new ProxyServer(properties); - Class<?> proxyProcClass = Class.forName(api.getName() + "$Processor"); - Class<?> proxyIfaceClass = Class.forName(api.getName() + "$Iface"); + // Wrap the implementation -- translate some exceptions - AccumuloProxy.Iface wrappedImpl = RpcWrapper.service(impl); ++ AccumuloProxy.Iface wrappedImpl = RpcWrapper.service(impl, new AccumuloProxy.Processor<AccumuloProxy.Iface>(impl).getProcessMapView()); + + // Create the processor from the implementation + TProcessor processor = new AccumuloProxy.Processor<AccumuloProxy.Iface>(wrappedImpl); + + // Get the type of thrift server to instantiate + final String serverTypeStr = properties.getProperty(THRIFT_SERVER_TYPE, THRIFT_SERVER_TYPE_DEFAULT); + ThriftServerType serverType = DEFAULT_SERVER_TYPE; + if (!THRIFT_SERVER_TYPE_DEFAULT.equals(serverTypeStr)) { + serverType = ThriftServerType.get(serverTypeStr); + } + + SslConnectionParams sslParams = null; + SaslServerConnectionParams saslParams = null; + switch (serverType) { + case SSL: + sslParams = SslConnectionParams.forClient(ClientContext.convertClientConfig(clientConf)); + break; + case SASL: + if (!clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j capability + log.error("FATAL: SASL thrift server was requested but it is disabled in client configuration"); + throw new RuntimeException("SASL is not enabled in configuration"); + } + + // Kerberos needs to be enabled to use it + if (!UserGroupInformation.isSecurityEnabled()) { + // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j capability + log.error("FATAL: Hadoop security is not enabled"); + throw new RuntimeException(); + } + + // Login via principal and keytab + final String kerberosPrincipal = properties.getProperty(KERBEROS_PRINCIPAL, ""), + kerberosKeytab = properties.getProperty(KERBEROS_KEYTAB, ""); + if (StringUtils.isBlank(kerberosPrincipal) || StringUtils.isBlank(kerberosKeytab)) { + // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j capability + log.error("FATAL: Kerberos principal and keytab must be provided"); + throw new RuntimeException(); + } + UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytab); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + log.info("Logged in as " + ugi.getUserName()); + + // The kerberosPrimary set in the SASL server needs to match the principal we're logged in as. + final String shortName = ugi.getShortUserName(); + log.info("Setting server primary to {}", shortName); + clientConf.setProperty(ClientProperty.KERBEROS_SERVER_PRIMARY, shortName); + + KerberosToken token = new KerberosToken(); + saslParams = new SaslServerConnectionParams(clientConf, token, null); + + processor = new UGIAssumingProcessor(processor); + + break; + default: + // nothing to do -- no extra configuration necessary + break; + } - @SuppressWarnings("unchecked") - Constructor<? extends TProcessor> proxyProcConstructor = (Constructor<? extends TProcessor>) proxyProcClass.getConstructor(proxyIfaceClass); + // Hook up support for tracing for thrift calls + TimedProcessor timedProcessor = new TimedProcessor(metricsFactory, processor, serverName, threadName); - @SuppressWarnings({"rawtypes", "unchecked"}) - final TProcessor processor = proxyProcConstructor.newInstance(RpcWrapper.service(impl, ((TBaseProcessor) proxyProcConstructor.newInstance(impl)).getProcessMapView())); + // Create the thrift server with our processor and properties + ServerAddress serverAddr = TServerUtils.startTServer(address, serverType, timedProcessor, protocolFactory, serverName, threadName, numThreads, + simpleTimerThreadpoolSize, threadpoolResizeInterval, maxFrameSize, sslParams, saslParams, serverSocketTimeout); - THsHaServer.Args args = new THsHaServer.Args(socket); - args.processor(processor); - final long maxFrameSize = AccumuloConfiguration.getMemoryInBytes(properties.getProperty("maxFrameSize", "16M")); - if (maxFrameSize > Integer.MAX_VALUE) - throw new RuntimeException(maxFrameSize + " is larger than MAX_INT"); - args.transportFactory(new TFramedTransport.Factory((int) maxFrameSize)); - args.protocolFactory(protoClass.newInstance()); - args.maxReadBufferBytes = maxFrameSize; - return new THsHaServer(args); + return serverAddr; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/022225b8/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java index 62d39d2,0000000..585eb27 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java @@@ -1,65 -1,0 +1,128 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.rpc; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; ++import java.util.HashSet; ++import java.util.Map; ++import java.util.Map.Entry; ++import java.util.Set; + +import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler; +import org.apache.accumulo.core.trace.wrappers.TraceWrap; ++import org.apache.thrift.ProcessFunction; +import org.apache.thrift.TApplicationException; ++import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class accommodates the changes in THRIFT-1805, which appeared in Thrift 0.9.1 and restricts client-side notification of server-side errors to + * {@link TException} only, by wrapping {@link RuntimeException} and {@link Error} as {@link TException}, so it doesn't just close the connection and look like + * a network issue, but informs the client that a {@link TApplicationException} had occurred, as it did in Thrift 0.9.0. This performs similar functions as + * {@link TraceWrap}, but with the additional action of translating exceptions. See also ACCUMULO-1691 and ACCUMULO-2950. + * ++ * ACCUMULO-4065 found that the above exception-wrapping is not appropriate for Thrift's implementation of oneway methods. Oneway methods are defined as a ++ * method which the client does not wait for it to return. Normally, this is acceptable as these methods are void. Therefore, if another client reuses the ++ * connection to send a new RPC, there is no "extra" data sitting on the InputStream from the Socket (that the server sent). However, the implementation of a ++ * oneway method <em>does</em> send a response to the client when the implementation throws a {@link TException}. This message showing up on the client's ++ * InputStream causes future use of the Thrift Connection to become unusable. As long as the Thrift implementation sends a message back when oneway methods ++ * throw a {@link TException}, we much make sure that we don't re-wrap-and-throw any exceptions as {@link TException}s. ++ * + * @since 1.6.1 + */ +public class RpcWrapper { ++ private static final Logger log = LoggerFactory.getLogger(RpcWrapper.class); ++ ++ public static <T> T service(final T instance, @SuppressWarnings("rawtypes") final Map<String,ProcessFunction<T,? extends TBase>> processorView) { ++ final Set<String> onewayMethods = getOnewayMethods(processorView); ++ log.debug("Found oneway Thrift methods: " + onewayMethods); ++ ++ InvocationHandler handler = getInvocationHandler(instance, onewayMethods); ++ ++ @SuppressWarnings("unchecked") ++ T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler); ++ return proxiedInstance; ++ } + - public static <T> T service(final T instance) { - InvocationHandler handler = new RpcServerInvocationHandler<T>(instance) { ++ protected static <T> RpcServerInvocationHandler<T> getInvocationHandler(final T instance, final Set<String> onewayMethods) { ++ return new RpcServerInvocationHandler<T>(instance) { + private final Logger log = LoggerFactory.getLogger(instance.getClass()); + + @Override + public Object invoke(Object obj, Method method, Object[] args) throws Throwable { ++ // e.g. ThriftClientHandler.flush(TInfo, TCredentials, ...) + try { + return super.invoke(obj, method, args); + } catch (RuntimeException e) { + String msg = e.getMessage(); - log.error("{}", msg, e); ++ log.error(msg, e); ++ if (onewayMethods.contains(method.getName())) { ++ throw e; ++ } + throw new TException(msg); + } catch (Error e) { + String msg = e.getMessage(); - log.error("{}", msg, e); ++ log.error(msg, e); ++ if (onewayMethods.contains(method.getName())) { ++ throw e; ++ } + throw new TException(msg); + } + } + }; - - @SuppressWarnings("unchecked") - T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler); - return proxiedInstance; + } + ++ protected static <T> Set<String> getOnewayMethods(@SuppressWarnings("rawtypes") Map<String,ProcessFunction<T,? extends TBase>> processorView) { ++ // Get a handle on the isOnewayMethod and make it accessible ++ final Method isOnewayMethod; ++ try { ++ isOnewayMethod = ProcessFunction.class.getDeclaredMethod("isOneway"); ++ } catch (NoSuchMethodException e) { ++ throw new RuntimeException("Could not access isOneway method", e); ++ } catch (SecurityException e) { ++ throw new RuntimeException("Could not access isOneway method", e); ++ } ++ // In java7, this appears to be copying the method, but it's trivial for us to return the object to how it was before. ++ final boolean accessible = isOnewayMethod.isAccessible(); ++ isOnewayMethod.setAccessible(true); ++ ++ try { ++ final Set<String> onewayMethods = new HashSet<String>(); ++ for (@SuppressWarnings("rawtypes") ++ Entry<String,ProcessFunction<T,? extends TBase>> entry : processorView.entrySet()) { ++ try { ++ if ((Boolean) isOnewayMethod.invoke(entry.getValue())) { ++ onewayMethods.add(entry.getKey()); ++ } ++ } catch (RuntimeException e) { ++ throw e; ++ } catch (Exception e) { ++ throw new RuntimeException(e); ++ } ++ } ++ ++ return onewayMethods; ++ } finally { ++ // Reset it back to how it was. ++ isOnewayMethod.setAccessible(accessible); ++ } ++ } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/022225b8/server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java ---------------------------------------------------------------------- diff --cc server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java index 0000000,0000000..11e8031 new file mode 100644 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java @@@ -1,0 -1,0 +1,266 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to you under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.accumulo.server.rpc; ++ ++import java.util.Collections; ++import java.util.HashMap; ++import java.util.Map; ++import java.util.Set; ++ ++import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler; ++import org.apache.accumulo.server.rpc.RpcWrapper; ++import org.apache.thrift.ProcessFunction; ++import org.apache.thrift.TBase; ++import org.apache.thrift.TException; ++import org.apache.thrift.protocol.TProtocol; ++import org.junit.Assert; ++import org.junit.Test; ++ ++import com.google.common.collect.Sets; ++ ++/** ++ * Verification that RpcWrapper correctly mangles Exceptions to work around Thrift. ++ */ ++public class RpcWrapperTest { ++ ++ private static final String RTE_MESSAGE = "RpcWrapperTest's RuntimeException Message"; ++ ++ /** ++ * Given a method name and whether or not the method is oneway, construct a ProcessFunction. ++ * ++ * @param methodName ++ * The service method name. ++ * @param isOneway ++ * Is the method oneway. ++ * @return A ProcessFunction. ++ */ ++ private fake_proc<FakeService> createProcessFunction(String methodName, boolean isOneway) { ++ return new fake_proc<FakeService>(methodName, isOneway); ++ } ++ ++ @Test ++ public void testSomeOnewayMethods() { ++ @SuppressWarnings("rawtypes") ++ Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new HashMap<String,ProcessFunction<FakeService,? extends TBase>>(); ++ procs.put("foo", createProcessFunction("foo", true)); ++ procs.put("foobar", createProcessFunction("foobar", false)); ++ procs.put("bar", createProcessFunction("bar", true)); ++ procs.put("barfoo", createProcessFunction("barfoo", false)); ++ ++ Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs); ++ Assert.assertEquals(Sets.newHashSet("foo", "bar"), onewayMethods); ++ } ++ ++ @Test ++ public void testNoOnewayMethods() { ++ @SuppressWarnings("rawtypes") ++ Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new HashMap<String,ProcessFunction<FakeService,? extends TBase>>(); ++ procs.put("foo", createProcessFunction("foo", false)); ++ procs.put("foobar", createProcessFunction("foobar", false)); ++ procs.put("bar", createProcessFunction("bar", false)); ++ procs.put("barfoo", createProcessFunction("barfoo", false)); ++ ++ Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs); ++ Assert.assertEquals(Collections.<String> emptySet(), onewayMethods); ++ } ++ ++ @Test ++ public void testAllOnewayMethods() { ++ @SuppressWarnings("rawtypes") ++ Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new HashMap<String,ProcessFunction<FakeService,? extends TBase>>(); ++ procs.put("foo", createProcessFunction("foo", true)); ++ procs.put("foobar", createProcessFunction("foobar", true)); ++ procs.put("bar", createProcessFunction("bar", true)); ++ procs.put("barfoo", createProcessFunction("barfoo", true)); ++ ++ Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs); ++ Assert.assertEquals(Sets.newHashSet("foo", "foobar", "bar", "barfoo"), onewayMethods); ++ } ++ ++ @Test ++ public void testNoExceptionWrappingForOneway() throws Throwable { ++ final Object[] args = new Object[0]; ++ ++ final FakeService impl = new FakeServiceImpl(); ++ ++ // "short" names throw RTEs and are oneway, while long names do not throw exceptions and are not oneway. ++ RpcServerInvocationHandler<FakeService> handler = RpcWrapper.getInvocationHandler(impl, Sets.newHashSet("foo", "bar")); ++ ++ // Should throw an exception, but not be wrapped because the method is oneway ++ try { ++ handler.invoke(impl, FakeServiceImpl.class.getMethod("foo"), args); ++ Assert.fail("Expected an exception"); ++ } catch (RuntimeException e) { ++ Assert.assertEquals(RTE_MESSAGE, e.getMessage()); ++ } ++ ++ // Should not throw an exception ++ handler.invoke(impl, FakeServiceImpl.class.getMethod("foobar"), args); ++ } ++ ++ @Test ++ public void testExceptionWrappingForNonOneway() throws Throwable { ++ final Object[] args = new Object[0]; ++ ++ final FakeService impl = new FakeServiceImpl(); ++ ++ // "short" names throw RTEs and are not oneway, while long names do not throw exceptions and are oneway. ++ RpcServerInvocationHandler<FakeService> handler = RpcWrapper.getInvocationHandler(impl, Sets.newHashSet("foobar", "barfoo")); ++ ++ // Should throw an exception, but not be wrapped because the method is oneway ++ try { ++ handler.invoke(impl, FakeServiceImpl.class.getMethod("foo"), args); ++ Assert.fail("Expected an exception"); ++ } catch (TException e) { ++ // The InvocationHandler should take the exception from the RTE and make it a TException ++ Assert.assertEquals(RTE_MESSAGE, e.getMessage()); ++ } ++ ++ // Should not throw an exception ++ handler.invoke(impl, FakeServiceImpl.class.getMethod("foobar"), args); ++ } ++ ++ // ++ // Some hacked together classes/interfaces that mimic what Thrift is doing. ++ // ++ ++ /** ++ * Some fake fields for our fake arguments. ++ */ ++ private static class fake_fields implements org.apache.thrift.TFieldIdEnum { ++ @Override ++ public short getThriftFieldId() { ++ return 0; ++ } ++ ++ @Override ++ public String getFieldName() { ++ return null; ++ } ++ } ++ ++ /** ++ * A fake thrift service ++ */ ++ interface FakeService { ++ void foo(); ++ ++ String foobar(); ++ ++ int bar(); ++ ++ long barfoo(); ++ } ++ ++ /** ++ * An implementation of the fake thrift service. The "short" names throw RTEs, while long names do not. ++ */ ++ public static class FakeServiceImpl implements FakeService { ++ @Override ++ public void foo() { ++ throw new RuntimeException(RTE_MESSAGE); ++ } ++ ++ @Override ++ public String foobar() { ++ return ""; ++ } ++ ++ @Override ++ public int bar() { ++ throw new RuntimeException(RTE_MESSAGE); ++ } ++ ++ @Override ++ public long barfoo() { ++ return 0; ++ } ++ }; ++ ++ /** ++ * A fake ProcessFunction implementation for testing that allows injection of method name and oneway. ++ */ ++ private static class fake_proc<I extends FakeService> extends org.apache.thrift.ProcessFunction<I,foo_args> { ++ final private boolean isOneway; ++ ++ public fake_proc(String methodName, boolean isOneway) { ++ super(methodName); ++ this.isOneway = isOneway; ++ } ++ ++ @Override ++ protected boolean isOneway() { ++ return isOneway; ++ } ++ ++ @SuppressWarnings("rawtypes") ++ @Override ++ public TBase getResult(I iface, foo_args args) throws TException { ++ return null; ++ } ++ ++ @Override ++ public foo_args getEmptyArgsInstance() { ++ return null; ++ } ++ } ++ ++ /** ++ * Fake arguments for our fake service. ++ */ ++ private static class foo_args implements org.apache.thrift.TBase<foo_args,fake_fields> { ++ ++ private static final long serialVersionUID = 1L; ++ ++ @Override ++ public int compareTo(foo_args o) { ++ return 0; ++ } ++ ++ @Override ++ public void read(TProtocol iprot) throws TException {} ++ ++ @Override ++ public void write(TProtocol oprot) throws TException {} ++ ++ @Override ++ public fake_fields fieldForId(int fieldId) { ++ return null; ++ } ++ ++ @Override ++ public boolean isSet(fake_fields field) { ++ return false; ++ } ++ ++ @Override ++ public Object getFieldValue(fake_fields field) { ++ return null; ++ } ++ ++ @Override ++ public void setFieldValue(fake_fields field, Object value) {} ++ ++ @Override ++ public TBase<foo_args,fake_fields> deepCopy() { ++ return null; ++ } ++ ++ @Override ++ public void clear() {} ++ } ++} http://git-wip-us.apache.org/repos/asf/accumulo/blob/022225b8/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --cc server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 1daafcb,b4afda8..4d55461 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@@ -709,16 -702,9 +709,16 @@@ public class SimpleGarbageCollector ext } private HostAndPort startStatsService() throws UnknownHostException { - Iface rpcProxy = RpcWrapper.service(this); - Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(this, new Processor<Iface>(this).getProcessMapView())); - int port = config.getPort(Property.GC_PORT); - long maxMessageSize = config.getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE); ++ Iface rpcProxy = RpcWrapper.service(this, new Processor<Iface>(this).getProcessMapView()); + final Processor<Iface> processor; + if (ThriftServerType.SASL == getThriftServerType()) { + Iface tcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass(), getConfiguration()); + processor = new Processor<Iface>(tcProxy); + } else { + processor = new Processor<Iface>(rpcProxy); + } + int port = getConfiguration().getPort(Property.GC_PORT); + long maxMessageSize = getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE); HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port); log.debug("Starting garbage collector listening on " + result); try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/022225b8/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java index 3a28eeb,af481c8..02e1132 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@@ -1147,38 -1032,10 +1147,38 @@@ public class Master extends AccumuloSer throw new IOException(e); } - Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(new MasterClientServiceHandler(this), - new Processor<Iface>(new MasterClientServiceHandler(this)).getProcessMapView())); - ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master", - "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); + ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zroot); + + // Make sure that we have a secret key (either a new one or an old one from ZK) before we start + // the master client service. + if (null != authenticationTokenKeyManager && null != keyDistributor) { + log.info("Starting delegation-token key manager"); + keyDistributor.initialize(); + authenticationTokenKeyManager.start(); + boolean logged = false; + while (!authenticationTokenKeyManager.isInitialized()) { + // Print out a status message when we start waiting for the key manager to get initialized + if (!logged) { + log.info("Waiting for AuthenticationTokenKeyManager to be initialized"); + logged = true; + } + UtilWaitThread.sleep(200); + } + // And log when we are initialized + log.info("AuthenticationTokenSecretManager is initialized"); + } + + clientHandler = new MasterClientServiceHandler(this); - Iface rpcProxy = RpcWrapper.service(clientHandler); ++ Iface rpcProxy = RpcWrapper.service(clientHandler, new Processor<Iface>(clientHandler).getProcessMapView()); + final Processor<Iface> processor; + if (ThriftServerType.SASL == getThriftServerType()) { + Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clientHandler.getClass(), getConfiguration()); + processor = new Processor<Iface>(tcredsProxy); + } else { + processor = new Processor<Iface>(rpcProxy); + } + ServerAddress sa = TServerUtils.startServer(this, hostname, Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, + Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); clientService = sa.server; String address = sa.address.toString(); log.info("Setting master lock data to " + address); @@@ -1187,42 -1044,6 +1187,43 @@@ while (!clientService.isServing()) { UtilWaitThread.sleep(100); } + + // Start the daemon to scan the replication table and make units of work + replicationWorkDriver = new ReplicationDriver(this); + replicationWorkDriver.start(); + + // Start the daemon to assign work to tservers to replicate to our peers + try { + replicationWorkAssigner = new WorkDriver(this); + } catch (AccumuloException | AccumuloSecurityException e) { + log.error("Caught exception trying to initialize replication WorkDriver", e); + throw new RuntimeException(e); + } + replicationWorkAssigner.start(); + + // Start the replication coordinator which assigns tservers to service replication requests ++ MasterReplicationCoordinator impl = new MasterReplicationCoordinator(this); + ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> replicationCoordinatorProcessor = new ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>( - RpcWrapper.service(new MasterReplicationCoordinator(this))); ++ RpcWrapper.service(impl, new ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(impl).getProcessMapView())); + ServerAddress replAddress = TServerUtils.startServer(this, hostname, Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor, + "Master Replication Coordinator", "Replication Coordinator", null, Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS, + Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); + + log.info("Started replication coordinator service at " + replAddress.address); + + // Advertise that port we used so peers don't have to be told what it is + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, + replAddress.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); + + // Register replication metrics + MasterMetricsFactory factory = new MasterMetricsFactory(getConfiguration(), this); + Metrics replicationMetrics = factory.createReplicationMetrics(); + try { + replicationMetrics.register(); + } catch (Exception e) { + log.error("Failed to register replication metrics", e); + } + while (clientService.isServing()) { UtilWaitThread.sleep(500); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/022225b8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 034cb16,651df66..3022a76 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -2331,46 -3159,15 +2331,47 @@@ public class TabletServer extends Accum private HostAndPort startTabletClientService() throws UnknownHostException { // start listening for client connection last - Iface rpcProxy = RpcWrapper.service(new ThriftClientHandler()); + ThriftClientHandler handler = new ThriftClientHandler(); - Iface tch = RpcWrapper.service(handler, new Processor<Iface>(handler).getProcessMapView()); - Processor<Iface> processor = new Processor<Iface>(tch); - HostAndPort address = startServer(getSystemConfiguration(), clientAddress.getHostText(), Property.TSERV_CLIENTPORT, processor, "Thrift Client Server"); ++ Iface rpcProxy = RpcWrapper.service(handler, new Processor<Iface>(handler).getProcessMapView()); + final Processor<Iface> processor; + if (ThriftServerType.SASL == getThriftServerType()) { + Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, ThriftClientHandler.class, getConfiguration()); + processor = new Processor<Iface>(tcredProxy); + } else { + processor = new Processor<Iface>(rpcProxy); + } + HostAndPort address = startServer(getServerConfigurationFactory().getConfiguration(), clientAddress.getHostText(), Property.TSERV_CLIENTPORT, processor, + "Thrift Client Server"); log.info("address = " + address); return address; } - ZooLock getLock() { + private HostAndPort startReplicationService() throws UnknownHostException { + final ReplicationServicerHandler handler = new ReplicationServicerHandler(this); - ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler); ++ ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler, new ReplicationServicer.Processor<ReplicationServicer.Iface>(handler).getProcessMapView()); + ReplicationServicer.Iface repl = TCredentialsUpdatingWrapper.service(rpcProxy, handler.getClass(), getConfiguration()); + ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl); + AccumuloConfiguration conf = getServerConfigurationFactory().getConfiguration(); + Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); + ServerAddress sp = TServerUtils.startServer(this, clientAddress.getHostText(), Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, + "ReplicationServicerHandler", "Replication Servicer", null, Property.REPLICATION_MIN_THREADS, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty); + this.replServer = sp.server; + log.info("Started replication service on " + sp.address); + + try { + // The replication service is unique to the thrift service for a tserver, not just a host. + // Advertise the host and port for replication service given the host and port for the tserver. + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) + ReplicationConstants.ZOO_TSERVERS + "/" + clientAddress.toString(), + sp.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); + } catch (Exception e) { + log.error("Could not advertise replication service port", e); + throw new RuntimeException(e); + } + + return sp.address; + } + + public ZooLock getLock() { return tabletServerLock; }