http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java deleted file mode 100644 index 515bf3c..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; - -public class LlapNodeId { - - private static final LoadingCache<LlapNodeId, LlapNodeId> CACHE = - CacheBuilder.newBuilder().softValues().build( - new CacheLoader<LlapNodeId, LlapNodeId>() { - @Override - public LlapNodeId load(LlapNodeId key) throws Exception { - return key; - } - }); - - public static LlapNodeId getInstance(String hostname, int port) { - return CACHE.getUnchecked(new LlapNodeId(hostname, port)); - } - - - private final String hostname; - private final int port; - - - private LlapNodeId(String hostname, int port) { - this.hostname = hostname; - this.port = port; - } - - public String getHostname() { - return hostname; - } - - public int getPort() { - return port; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - LlapNodeId that = (LlapNodeId) o; - - if (port != that.port) { - return false; - } - if (!hostname.equals(that.hostname)) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - int result = hostname.hashCode(); - result = 1009 * result + port; - return result; - } - - @Override - public String toString() { - return hostname + ":" + port; - } -}
http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java index 544af09..0399798 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Collection; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -217,7 +219,13 @@ public class LlapServiceDriver { CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(), libDir.toString(), true); lfs.delete(new Path(libDir, "tez.tar.gz"), false); + // llap-common + lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(LlapDaemonProtocolProtos.class)), libDir); + // llap-tez + lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(LlapTezUtils.class)), libDir); + // llap-server lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(LlapInputFormat.class)), libDir); + // hive-exec lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(HiveInputFormat.class)), libDir); // copy default aux classes (json/hbase) http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java deleted file mode 100644 index 4c09941..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.daemon; - -import org.apache.hadoop.ipc.ProtocolInfo; -import org.apache.hadoop.security.KerberosInfo; -import org.apache.hadoop.security.token.TokenInfo; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; -import org.apache.hadoop.hive.llap.security.LlapTokenSelector; - -@ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB", protocolVersion = 1) -@KerberosInfo(serverPrincipal = HiveConf.HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME) -@TokenInfo(LlapTokenSelector.class) -public interface LlapDaemonProtocolBlockingPB extends LlapDaemonProtocolProtos.LlapDaemonProtocol.BlockingInterface { -} http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java deleted file mode 100644 index 4efadac..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.daemon; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; -import org.apache.hadoop.ipc.ProtocolInfo; -import org.apache.hadoop.security.KerberosInfo; - -@ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB", protocolVersion = 1) -@KerberosInfo(serverPrincipal = HiveConf.HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME) -public interface LlapManagementProtocolBlockingPB extends LlapDaemonProtocolProtos.LlapManagementProtocol.BlockingInterface { -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 7d7fa00..94b3b41 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -19,7 +19,6 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryPoolMXBean; import java.lang.management.MemoryType; import java.net.InetSocketAddress; -import java.net.URLClassLoader; import java.util.Arrays; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -48,7 +47,6 @@ import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ExitUtil; @@ -67,7 +65,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla private static final Logger LOG = LoggerFactory.getLogger(LlapDaemon.class); private final Configuration shuffleHandlerConf; - private final LlapDaemonProtocolServerImpl server; + private final LlapProtocolServerImpl server; private final ContainerRunnerImpl containerRunner; private final AMReporter amReporter; private final LlapRegistryService registry; @@ -166,7 +164,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf); - this.server = new LlapDaemonProtocolServerImpl( + this.server = new LlapProtocolServerImpl( numHandlers, this, srvAddress, mngAddress, srvPort, mngPort); ClassLoader executorClassLoader = null; http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java deleted file mode 100644 index 9c7d2e2..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.daemon.impl; - -import javax.annotation.Nullable; -import javax.net.SocketFactory; -import java.io.IOException; -import java.net.InetSocketAddress; - -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.ProtocolProxy; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; -import org.apache.hadoop.security.UserGroupInformation; - -// TODO Change all this to be based on a regular interface instead of relying on the Proto service - Exception signatures cannot be controlled without this for the moment. - - -public class LlapDaemonProtocolClientImpl implements LlapDaemonProtocolBlockingPB { - - private final Configuration conf; - private final InetSocketAddress serverAddr; - private final RetryPolicy retryPolicy; - private final SocketFactory socketFactory; - LlapDaemonProtocolBlockingPB proxy; - - - public LlapDaemonProtocolClientImpl(Configuration conf, String hostname, int port, - @Nullable RetryPolicy retryPolicy, - @Nullable SocketFactory socketFactory) { - this.conf = conf; - this.serverAddr = NetUtils.createSocketAddr(hostname, port); - this.retryPolicy = retryPolicy; - if (socketFactory == null) { - this.socketFactory = NetUtils.getDefaultSocketFactory(conf); - } else { - this.socketFactory = socketFactory; - } - } - - @Override - public SubmitWorkResponseProto submitWork(RpcController controller, - SubmitWorkRequestProto request) throws - ServiceException { - try { - return getProxy().submitWork(null, request); - } catch (IOException e) { - throw new ServiceException(e); - } - } - - @Override - public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller, - SourceStateUpdatedRequestProto request) throws - ServiceException { - try { - return getProxy().sourceStateUpdated(null, request); - } catch (IOException e) { - throw new ServiceException(e); - } - } - - @Override - public QueryCompleteResponseProto queryComplete(RpcController controller, - QueryCompleteRequestProto request) throws - ServiceException { - try { - return getProxy().queryComplete(null, request); - } catch (IOException e) { - throw new ServiceException(e); - } - } - - @Override - public TerminateFragmentResponseProto terminateFragment( - RpcController controller, - TerminateFragmentRequestProto request) throws ServiceException { - try { - return getProxy().terminateFragment(null, request); - } catch (IOException e) { - throw new ServiceException(e); - } - } - - public LlapDaemonProtocolBlockingPB getProxy() throws IOException { - if (proxy == null) { - proxy = createProxy(); - } - return proxy; - } - - public LlapDaemonProtocolBlockingPB createProxy() throws IOException { - RPC.setProtocolEngine(conf, LlapDaemonProtocolBlockingPB.class, ProtobufRpcEngine.class); - ProtocolProxy<LlapDaemonProtocolBlockingPB> proxy = - RPC.getProtocolProxy(LlapDaemonProtocolBlockingPB.class, 0, serverAddr, - UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), 0, - retryPolicy); - return proxy.getProxy(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java deleted file mode 100644 index 45ca906..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.daemon.impl; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.security.PrivilegedAction; -import java.util.concurrent.atomic.AtomicReference; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; -import com.google.protobuf.BlockingService; -import com.google.protobuf.ByteString; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.hive.llap.security.LlapSecurityHelper; -import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; -import org.apache.hadoop.hive.llap.security.SecretManager; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.hive.llap.daemon.ContainerRunner; -import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; -import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB; -import org.apache.hadoop.hive.llap.security.LlapDaemonPolicyProvider; - -public class LlapDaemonProtocolServerImpl extends AbstractService - implements LlapDaemonProtocolBlockingPB, LlapManagementProtocolBlockingPB { - - private static final Logger LOG = LoggerFactory.getLogger(LlapDaemonProtocolServerImpl.class); - - private final int numHandlers; - private final ContainerRunner containerRunner; - private final int srvPort, mngPort; - private RPC.Server server, mngServer; - private final AtomicReference<InetSocketAddress> srvAddress, mngAddress; - private SecretManager zkSecretManager; - - public LlapDaemonProtocolServerImpl(int numHandlers, - ContainerRunner containerRunner, - AtomicReference<InetSocketAddress> srvAddress, - AtomicReference<InetSocketAddress> mngAddress, - int srvPort, - int mngPort) { - super("LlapDaemonProtocolServerImpl"); - this.numHandlers = numHandlers; - this.containerRunner = containerRunner; - this.srvAddress = srvAddress; - this.srvPort = srvPort; - this.mngAddress = mngAddress; - this.mngPort = mngPort; - LOG.info("Creating: " + LlapDaemonProtocolServerImpl.class.getSimpleName() + - " with port configured to: " + srvPort); - } - - @Override - public SubmitWorkResponseProto submitWork(RpcController controller, - SubmitWorkRequestProto request) throws - ServiceException { - try { - return containerRunner.submitWork(request); - } catch (IOException e) { - throw new ServiceException(e); - } - } - - @Override - public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller, - SourceStateUpdatedRequestProto request) throws ServiceException { - return containerRunner.sourceStateUpdated(request); - } - - @Override - public QueryCompleteResponseProto queryComplete(RpcController controller, - QueryCompleteRequestProto request) throws ServiceException { - return containerRunner.queryComplete(request); - } - - @Override - public TerminateFragmentResponseProto terminateFragment( - RpcController controller, - TerminateFragmentRequestProto request) throws ServiceException { - return containerRunner.terminateFragment(request); - } - - @Override - public void serviceStart() { - final Configuration conf = getConfig(); - final BlockingService daemonImpl = - LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService(this); - final BlockingService managementImpl = - LlapDaemonProtocolProtos.LlapManagementProtocol.newReflectiveBlockingService(this); - if (!UserGroupInformation.isSecurityEnabled()) { - startProtocolServers(conf, daemonImpl, managementImpl); - return; - } - String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL), - llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE); - zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab); - - // Start the protocol server after properly authenticating with daemon keytab. - UserGroupInformation daemonUgi = null; - try { - daemonUgi = LlapSecurityHelper.loginWithKerberos(llapPrincipal, llapKeytab); - } catch (IOException e) { - throw new RuntimeException(e); - } - daemonUgi.doAs(new PrivilegedAction<Void>() { - @Override - public Void run() { - startProtocolServers(conf, daemonImpl, managementImpl); - return null; - } - }); - } - - private void startProtocolServers( - Configuration conf, BlockingService daemonImpl, BlockingService managementImpl) { - server = startProtocolServer(srvPort, numHandlers, srvAddress, conf, daemonImpl, - LlapDaemonProtocolBlockingPB.class); - mngServer = startProtocolServer(mngPort, 2, mngAddress, conf, managementImpl, - LlapManagementProtocolBlockingPB.class); - } - - private RPC.Server startProtocolServer(int srvPort, int numHandlers, - AtomicReference<InetSocketAddress> bindAddress, Configuration conf, - BlockingService impl, Class<?> protocolClass) { - InetSocketAddress addr = new InetSocketAddress(srvPort); - RPC.Server server; - try { - server = createServer(protocolClass, addr, conf, numHandlers, impl); - server.start(); - } catch (IOException e) { - LOG.error("Failed to run RPC Server on port: " + srvPort, e); - throw new RuntimeException(e); - } - - InetSocketAddress serverBindAddress = NetUtils.getConnectAddress(server); - bindAddress.set(NetUtils.createSocketAddrForHost( - serverBindAddress.getAddress().getCanonicalHostName(), - serverBindAddress.getPort())); - LOG.info("Instantiated " + protocolClass.getSimpleName() + " at " + bindAddress); - return server; - } - - @Override - public void serviceStop() { - if (server != null) { - server.stop(); - } - if (mngServer != null) { - mngServer.stop(); - } - } - - @InterfaceAudience.Private - @VisibleForTesting - InetSocketAddress getBindAddress() { - return srvAddress.get(); - } - - private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, - int numHandlers, BlockingService blockingService) throws - IOException { - RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class); - RPC.Builder builder = new RPC.Builder(conf) - .setProtocol(pbProtocol) - .setInstance(blockingService) - .setBindAddress(addr.getHostName()) - .setPort(addr.getPort()) - .setNumHandlers(numHandlers); - if (zkSecretManager != null) { - builder = builder.setSecretManager(zkSecretManager); - } - RPC.Server server = builder.build(); - - if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { - server.refreshServiceAcl(conf, new LlapDaemonPolicyProvider()); - } - return server; - } - - - @Override - public GetTokenResponseProto getDelegationToken(RpcController controller, - GetTokenRequestProto request) throws ServiceException { - if (zkSecretManager == null) { - throw new ServiceException("Operation not supported on unsecure cluster"); - } - UserGroupInformation ugi; - try { - ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new ServiceException(e); - } - String user = ugi.getUserName(); - Text owner = new Text(user); - Text realUser = null; - if (ugi.getRealUser() != null) { - realUser = new Text(ugi.getRealUser().getUserName()); - } - Text renewer = new Text(ugi.getShortUserName()); - LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser); - // TODO: note that the token is not renewable right now and will last for 2 weeks by default. - Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, zkSecretManager); - ByteArrayDataOutput out = ByteStreams.newDataOutput(); - try { - token.write(out); - } catch (IOException e) { - throw new ServiceException(e); - } - ByteString bs = ByteString.copyFrom(out.toByteArray()); - GetTokenResponseProto response = GetTokenResponseProto.newBuilder().setToken(bs).build(); - return response; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java deleted file mode 100644 index e293a95..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.daemon.impl; - -import javax.annotation.Nullable; -import javax.net.SocketFactory; -import java.io.IOException; -import java.net.InetSocketAddress; - -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.ProtocolProxy; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto; -import org.apache.hadoop.security.UserGroupInformation; - -public class LlapManagementProtocolClientImpl implements LlapManagementProtocolBlockingPB { - - private final Configuration conf; - private final InetSocketAddress serverAddr; - private final RetryPolicy retryPolicy; - private final SocketFactory socketFactory; - LlapManagementProtocolBlockingPB proxy; - - - public LlapManagementProtocolClientImpl(Configuration conf, String hostname, int port, - @Nullable RetryPolicy retryPolicy, - @Nullable SocketFactory socketFactory) { - this.conf = conf; - this.serverAddr = NetUtils.createSocketAddr(hostname, port); - this.retryPolicy = retryPolicy; - if (socketFactory == null) { - this.socketFactory = NetUtils.getDefaultSocketFactory(conf); - } else { - this.socketFactory = socketFactory; - } - } - - public LlapManagementProtocolBlockingPB getProxy() throws IOException { - if (proxy == null) { - proxy = createProxy(); - } - return proxy; - } - - public LlapManagementProtocolBlockingPB createProxy() throws IOException { - RPC.setProtocolEngine(conf, LlapManagementProtocolBlockingPB.class, ProtobufRpcEngine.class); - ProtocolProxy<LlapManagementProtocolBlockingPB> proxy = - RPC.getProtocolProxy(LlapManagementProtocolBlockingPB.class, 0, serverAddr, - UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), 0, - retryPolicy); - return proxy.getProxy(); - } - - @Override - public GetTokenResponseProto getDelegationToken(RpcController controller, - GetTokenRequestProto request) throws ServiceException { - try { - return getProxy().getDelegationToken(null, request); - } catch (IOException e) { - throw new ServiceException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java new file mode 100644 index 0000000..c386d77 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java @@ -0,0 +1,251 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import com.google.protobuf.BlockingService; +import com.google.protobuf.ByteString; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.hive.llap.security.LlapSecurityHelper; +import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; +import org.apache.hadoop.hive.llap.security.SecretManager; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.hive.llap.daemon.ContainerRunner; +import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB; +import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB; +import org.apache.hadoop.hive.llap.security.LlapDaemonPolicyProvider; + +public class LlapProtocolServerImpl extends AbstractService + implements LlapProtocolBlockingPB, LlapManagementProtocolPB { + + private static final Logger LOG = LoggerFactory.getLogger(LlapProtocolServerImpl.class); + + private final int numHandlers; + private final ContainerRunner containerRunner; + private final int srvPort, mngPort; + private RPC.Server server, mngServer; + private final AtomicReference<InetSocketAddress> srvAddress, mngAddress; + private SecretManager zkSecretManager; + + public LlapProtocolServerImpl(int numHandlers, + ContainerRunner containerRunner, + AtomicReference<InetSocketAddress> srvAddress, + AtomicReference<InetSocketAddress> mngAddress, + int srvPort, + int mngPort) { + super("LlapDaemonProtocolServerImpl"); + this.numHandlers = numHandlers; + this.containerRunner = containerRunner; + this.srvAddress = srvAddress; + this.srvPort = srvPort; + this.mngAddress = mngAddress; + this.mngPort = mngPort; + LOG.info("Creating: " + LlapProtocolServerImpl.class.getSimpleName() + + " with port configured to: " + srvPort); + } + + @Override + public SubmitWorkResponseProto submitWork(RpcController controller, + SubmitWorkRequestProto request) throws + ServiceException { + try { + return containerRunner.submitWork(request); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller, + SourceStateUpdatedRequestProto request) throws ServiceException { + return containerRunner.sourceStateUpdated(request); + } + + @Override + public QueryCompleteResponseProto queryComplete(RpcController controller, + QueryCompleteRequestProto request) throws ServiceException { + return containerRunner.queryComplete(request); + } + + @Override + public TerminateFragmentResponseProto terminateFragment( + RpcController controller, + TerminateFragmentRequestProto request) throws ServiceException { + return containerRunner.terminateFragment(request); + } + + @Override + public void serviceStart() { + final Configuration conf = getConfig(); + final BlockingService daemonImpl = + LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService(this); + final BlockingService managementImpl = + LlapDaemonProtocolProtos.LlapManagementProtocol.newReflectiveBlockingService(this); + if (!UserGroupInformation.isSecurityEnabled()) { + startProtocolServers(conf, daemonImpl, managementImpl); + return; + } + String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL), + llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE); + zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab); + + // Start the protocol server after properly authenticating with daemon keytab. + UserGroupInformation daemonUgi = null; + try { + daemonUgi = LlapSecurityHelper.loginWithKerberos(llapPrincipal, llapKeytab); + } catch (IOException e) { + throw new RuntimeException(e); + } + daemonUgi.doAs(new PrivilegedAction<Void>() { + @Override + public Void run() { + startProtocolServers(conf, daemonImpl, managementImpl); + return null; + } + }); + } + + private void startProtocolServers( + Configuration conf, BlockingService daemonImpl, BlockingService managementImpl) { + server = startProtocolServer(srvPort, numHandlers, srvAddress, conf, daemonImpl, + LlapProtocolBlockingPB.class); + mngServer = startProtocolServer(mngPort, 2, mngAddress, conf, managementImpl, + LlapManagementProtocolPB.class); + } + + private RPC.Server startProtocolServer(int srvPort, int numHandlers, + AtomicReference<InetSocketAddress> bindAddress, Configuration conf, + BlockingService impl, Class<?> protocolClass) { + InetSocketAddress addr = new InetSocketAddress(srvPort); + RPC.Server server; + try { + server = createServer(protocolClass, addr, conf, numHandlers, impl); + server.start(); + } catch (IOException e) { + LOG.error("Failed to run RPC Server on port: " + srvPort, e); + throw new RuntimeException(e); + } + + InetSocketAddress serverBindAddress = NetUtils.getConnectAddress(server); + bindAddress.set(NetUtils.createSocketAddrForHost( + serverBindAddress.getAddress().getCanonicalHostName(), + serverBindAddress.getPort())); + LOG.info("Instantiated " + protocolClass.getSimpleName() + " at " + bindAddress); + return server; + } + + @Override + public void serviceStop() { + if (server != null) { + server.stop(); + } + if (mngServer != null) { + mngServer.stop(); + } + } + + @InterfaceAudience.Private + @VisibleForTesting + InetSocketAddress getBindAddress() { + return srvAddress.get(); + } + + private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, + int numHandlers, BlockingService blockingService) throws + IOException { + RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class); + RPC.Builder builder = new RPC.Builder(conf) + .setProtocol(pbProtocol) + .setInstance(blockingService) + .setBindAddress(addr.getHostName()) + .setPort(addr.getPort()) + .setNumHandlers(numHandlers); + if (zkSecretManager != null) { + builder = builder.setSecretManager(zkSecretManager); + } + RPC.Server server = builder.build(); + + if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { + server.refreshServiceAcl(conf, new LlapDaemonPolicyProvider()); + } + return server; + } + + + @Override + public GetTokenResponseProto getDelegationToken(RpcController controller, + GetTokenRequestProto request) throws ServiceException { + if (zkSecretManager == null) { + throw new ServiceException("Operation not supported on unsecure cluster"); + } + UserGroupInformation ugi; + try { + ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new ServiceException(e); + } + String user = ugi.getUserName(); + Text owner = new Text(user); + Text realUser = null; + if (ugi.getRealUser() != null) { + realUser = new Text(ugi.getRealUser().getUserName()); + } + Text renewer = new Text(ugi.getShortUserName()); + LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser); + // TODO: note that the token is not renewable right now and will last for 2 weeks by default. + Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, zkSecretManager); + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + try { + token.write(out); + } catch (IOException e) { + throw new ServiceException(e); + } + ByteString bs = ByteString.copyFrom(out.toByteArray()); + GetTokenResponseProto response = GetTokenResponseProto.newBuilder().setToken(bs).build(); + return response; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java index aa065a9..480a394 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; -import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,7 +89,7 @@ public class QueryFragmentInfo { boolean canFinish = true; if (inputSpecList != null && !inputSpecList.isEmpty()) { for (IOSpecProto inputSpec : inputSpecList) { - if (isSourceOfInterest(inputSpec)) { + if (LlapTezUtils.isSourceOfInterest(inputSpec.getIoDescriptor().getClassName())) { // Lookup the state in the map. LlapDaemonProtocolProtos.SourceStateProto state = queryInfo.getSourceStateMap() .get(inputSpec.getConnectedVertexName()); @@ -129,7 +129,7 @@ public class QueryFragmentInfo { List<IOSpecProto> inputSpecList = fragmentSpec.getInputSpecsList(); if (inputSpecList != null && !inputSpecList.isEmpty()) { for (IOSpecProto inputSpec : inputSpecList) { - if (isSourceOfInterest(inputSpec)) { + if (LlapTezUtils.isSourceOfInterest(inputSpec.getIoDescriptor().getClassName())) { sourcesOfInterest.add(inputSpec.getConnectedVertexName()); } } @@ -143,13 +143,6 @@ public class QueryFragmentInfo { queryInfo.unregisterFinishableStateUpdate(handler); } - private boolean isSourceOfInterest(IOSpecProto inputSpec) { - String inputClassName = inputSpec.getIoDescriptor().getClassName(); - // MRInput is not of interest since it'll always be ready. - return !inputClassName.equals(MRInputLegacy.class.getName()); - } - - @Override public boolean equals(Object o) { if (this == o) { http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index ede2a03..d88d82a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecPro import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; -import org.apache.hadoop.hive.llap.tezplugins.Converters; +import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.ql.io.IOContextMap; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java deleted file mode 100644 index 9549567..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.protocol; - -import java.io.IOException; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.VersionedProtocol; -import org.apache.hadoop.security.token.TokenInfo; -import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; -import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; -import org.apache.tez.runtime.common.security.JobTokenSelector; - -@TokenInfo(JobTokenSelector.class) -public interface LlapTaskUmbilicalProtocol extends VersionedProtocol { - - public static final long versionID = 1L; - - // From Tez. Eventually changes over to the LLAP protocol and ProtocolBuffers - boolean canCommit(TezTaskAttemptID taskid) throws IOException; - public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) - throws IOException, TezException; - - public void nodeHeartbeat(Text hostname, int port) throws IOException; - - public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java index d67647b..bedd265 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java @@ -18,17 +18,17 @@ package org.apache.hadoop.hive.llap.security; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; -import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB; +import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB; +import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.Service; public class LlapDaemonPolicyProvider extends PolicyProvider { private static final Service[] services = new Service[] { new Service(HiveConf.ConfVars.LLAP_SECURITY_ACL.varname, - LlapDaemonProtocolBlockingPB.class), + LlapProtocolBlockingPB.class), new Service(HiveConf.ConfVars.LLAP_MANAGEMENT_ACL.varname, - LlapManagementProtocolBlockingPB.class) + LlapManagementProtocolPB.class) }; @Override http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java index a00b631..aa8745d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java @@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit; import javax.net.SocketFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.daemon.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.registry.ServiceInstance; http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java index 4dca2ce..eb514f2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java @@ -20,8 +20,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; -import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB; +import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB; +import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB; import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.SecurityInfo; import org.apache.hadoop.security.token.TokenIdentifier; @@ -36,8 +36,8 @@ public class LlapServerSecurityInfo extends SecurityInfo { if (LOG.isDebugEnabled()) { LOG.debug("Trying to get KerberosInfo for " + protocol); } - if (!LlapDaemonProtocolBlockingPB.class.isAssignableFrom(protocol) - && !LlapManagementProtocolBlockingPB.class.isAssignableFrom(protocol)) return null; + if (!LlapProtocolBlockingPB.class.isAssignableFrom(protocol) + && !LlapManagementProtocolPB.class.isAssignableFrom(protocol)) return null; return new KerberosInfo() { @Override public Class<? extends Annotation> annotationType() { @@ -62,7 +62,7 @@ public class LlapServerSecurityInfo extends SecurityInfo { LOG.debug("Trying to get TokenInfo for " + protocol); } // Tokens cannot be used for the management protocol (for now). - if (!LlapDaemonProtocolBlockingPB.class.isAssignableFrom(protocol)) return null; + if (!LlapProtocolBlockingPB.class.isAssignableFrom(protocol)) return null; return new TokenInfo() { @Override public Class<? extends Annotation> annotationType() { http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java deleted file mode 100644 index b6e7499..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.security; - -import java.util.Collection; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenSelector; - -public class LlapTokenSelector implements TokenSelector<LlapTokenIdentifier> { - private static final Log LOG = LogFactory.getLog(LlapTokenSelector.class); - - @Override - public Token<LlapTokenIdentifier> selectToken(Text service, - Collection<Token<? extends TokenIdentifier>> tokens) { - if (service == null) return null; - if (LOG.isDebugEnabled()) { - LOG.debug("Looking for a token with service " + service); - } - for (Token<? extends TokenIdentifier> token : tokens) { - if (LOG.isDebugEnabled()) { - LOG.debug("Token = " + token.getKind() + "; service = " + token.getService()); - } - if (LlapTokenIdentifier.KIND_NAME.equals(token.getKind()) - && service.equals(token.getService())) { - @SuppressWarnings("unchecked") - Token<LlapTokenIdentifier> result = (Token<LlapTokenIdentifier>)token; - return result; - } - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java deleted file mode 100644 index f61d62f..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.tezplugins; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import com.google.protobuf.ByteString; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto; -import org.apache.tez.common.TezCommonUtils; -import org.apache.tez.dag.api.EntityDescriptor; -import org.apache.tez.dag.api.InputDescriptor; -import org.apache.tez.dag.api.OutputDescriptor; -import org.apache.tez.dag.api.ProcessorDescriptor; -import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.dag.api.event.VertexState; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.runtime.api.impl.GroupInputSpec; -import org.apache.tez.runtime.api.impl.InputSpec; -import org.apache.tez.runtime.api.impl.OutputSpec; -import org.apache.tez.runtime.api.impl.TaskSpec; - -public class Converters { - - public static TaskSpec getTaskSpecfromProto(FragmentSpecProto FragmentSpecProto) { - TezTaskAttemptID taskAttemptID = - TezTaskAttemptID.fromString(FragmentSpecProto.getFragmentIdentifierString()); - - ProcessorDescriptor processorDescriptor = null; - if (FragmentSpecProto.hasProcessorDescriptor()) { - processorDescriptor = convertProcessorDescriptorFromProto( - FragmentSpecProto.getProcessorDescriptor()); - } - - List<InputSpec> inputSpecList = new ArrayList<InputSpec>(FragmentSpecProto.getInputSpecsCount()); - if (FragmentSpecProto.getInputSpecsCount() > 0) { - for (IOSpecProto inputSpecProto : FragmentSpecProto.getInputSpecsList()) { - inputSpecList.add(getInputSpecFromProto(inputSpecProto)); - } - } - - List<OutputSpec> outputSpecList = - new ArrayList<OutputSpec>(FragmentSpecProto.getOutputSpecsCount()); - if (FragmentSpecProto.getOutputSpecsCount() > 0) { - for (IOSpecProto outputSpecProto : FragmentSpecProto.getOutputSpecsList()) { - outputSpecList.add(getOutputSpecFromProto(outputSpecProto)); - } - } - - List<GroupInputSpec> groupInputSpecs = - new ArrayList<GroupInputSpec>(FragmentSpecProto.getGroupedInputSpecsCount()); - if (FragmentSpecProto.getGroupedInputSpecsCount() > 0) { - for (GroupInputSpecProto groupInputSpecProto : FragmentSpecProto.getGroupedInputSpecsList()) { - groupInputSpecs.add(getGroupInputSpecFromProto(groupInputSpecProto)); - } - } - - TaskSpec taskSpec = - new TaskSpec(taskAttemptID, FragmentSpecProto.getDagName(), FragmentSpecProto.getVertexName(), - FragmentSpecProto.getVertexParallelism(), processorDescriptor, inputSpecList, - outputSpecList, groupInputSpecs); - return taskSpec; - } - - public static FragmentSpecProto convertTaskSpecToProto(TaskSpec taskSpec) { - FragmentSpecProto.Builder builder = FragmentSpecProto.newBuilder(); - builder.setFragmentIdentifierString(taskSpec.getTaskAttemptID().toString()); - builder.setDagName(taskSpec.getDAGName()); - builder.setVertexName(taskSpec.getVertexName()); - builder.setVertexParallelism(taskSpec.getVertexParallelism()); - builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId()); - builder.setAttemptNumber(taskSpec.getTaskAttemptID().getId()); - - if (taskSpec.getProcessorDescriptor() != null) { - builder.setProcessorDescriptor( - convertToProto(taskSpec.getProcessorDescriptor())); - } - - if (taskSpec.getInputs() != null && !taskSpec.getInputs().isEmpty()) { - for (InputSpec inputSpec : taskSpec.getInputs()) { - builder.addInputSpecs(convertInputSpecToProto(inputSpec)); - } - } - - if (taskSpec.getOutputs() != null && !taskSpec.getOutputs().isEmpty()) { - for (OutputSpec outputSpec : taskSpec.getOutputs()) { - builder.addOutputSpecs(convertOutputSpecToProto(outputSpec)); - } - } - - if (taskSpec.getGroupInputs() != null && !taskSpec.getGroupInputs().isEmpty()) { - for (GroupInputSpec groupInputSpec : taskSpec.getGroupInputs()) { - builder.addGroupedInputSpecs(convertGroupInputSpecToProto(groupInputSpec)); - - } - } - return builder.build(); - } - - private static ProcessorDescriptor convertProcessorDescriptorFromProto( - EntityDescriptorProto proto) { - String className = proto.getClassName(); - UserPayload payload = convertPayloadFromProto(proto); - ProcessorDescriptor pd = ProcessorDescriptor.create(className); - setUserPayload(pd, payload); - return pd; - } - - private static EntityDescriptorProto convertToProto( - EntityDescriptor<?> descriptor) { - EntityDescriptorProto.Builder builder = EntityDescriptorProto - .newBuilder(); - builder.setClassName(descriptor.getClassName()); - - UserPayload userPayload = descriptor.getUserPayload(); - if (userPayload != null) { - UserPayloadProto.Builder payloadBuilder = UserPayloadProto.newBuilder(); - if (userPayload.hasPayload()) { - payloadBuilder.setUserPayload(ByteString.copyFrom(userPayload.getPayload())); - payloadBuilder.setVersion(userPayload.getVersion()); - } - builder.setUserPayload(payloadBuilder.build()); - } - if (descriptor.getHistoryText() != null) { - try { - builder.setHistoryText(TezCommonUtils.compressByteArrayToByteString( - descriptor.getHistoryText().getBytes("UTF-8"))); - } catch (IOException e) { - throw new TezUncheckedException(e); - } - } - return builder.build(); - } - - private static InputSpec getInputSpecFromProto(IOSpecProto inputSpecProto) { - InputDescriptor inputDescriptor = null; - if (inputSpecProto.hasIoDescriptor()) { - inputDescriptor = - convertInputDescriptorFromProto(inputSpecProto.getIoDescriptor()); - } - InputSpec inputSpec = new InputSpec(inputSpecProto.getConnectedVertexName(), inputDescriptor, - inputSpecProto.getPhysicalEdgeCount()); - return inputSpec; - } - - private static InputDescriptor convertInputDescriptorFromProto( - EntityDescriptorProto proto) { - String className = proto.getClassName(); - UserPayload payload = convertPayloadFromProto(proto); - InputDescriptor id = InputDescriptor.create(className); - setUserPayload(id, payload); - return id; - } - - private static OutputDescriptor convertOutputDescriptorFromProto( - EntityDescriptorProto proto) { - String className = proto.getClassName(); - UserPayload payload = convertPayloadFromProto(proto); - OutputDescriptor od = OutputDescriptor.create(className); - setUserPayload(od, payload); - return od; - } - - private static IOSpecProto convertInputSpecToProto(InputSpec inputSpec) { - IOSpecProto.Builder builder = IOSpecProto.newBuilder(); - if (inputSpec.getSourceVertexName() != null) { - builder.setConnectedVertexName(inputSpec.getSourceVertexName()); - } - if (inputSpec.getInputDescriptor() != null) { - builder.setIoDescriptor(convertToProto(inputSpec.getInputDescriptor())); - } - builder.setPhysicalEdgeCount(inputSpec.getPhysicalEdgeCount()); - return builder.build(); - } - - private static OutputSpec getOutputSpecFromProto(IOSpecProto outputSpecProto) { - OutputDescriptor outputDescriptor = null; - if (outputSpecProto.hasIoDescriptor()) { - outputDescriptor = - convertOutputDescriptorFromProto(outputSpecProto.getIoDescriptor()); - } - OutputSpec outputSpec = - new OutputSpec(outputSpecProto.getConnectedVertexName(), outputDescriptor, - outputSpecProto.getPhysicalEdgeCount()); - return outputSpec; - } - - public static IOSpecProto convertOutputSpecToProto(OutputSpec outputSpec) { - IOSpecProto.Builder builder = IOSpecProto.newBuilder(); - if (outputSpec.getDestinationVertexName() != null) { - builder.setConnectedVertexName(outputSpec.getDestinationVertexName()); - } - if (outputSpec.getOutputDescriptor() != null) { - builder.setIoDescriptor(convertToProto(outputSpec.getOutputDescriptor())); - } - builder.setPhysicalEdgeCount(outputSpec.getPhysicalEdgeCount()); - return builder.build(); - } - - private static GroupInputSpec getGroupInputSpecFromProto(GroupInputSpecProto groupInputSpecProto) { - GroupInputSpec groupSpec = new GroupInputSpec(groupInputSpecProto.getGroupName(), - groupInputSpecProto.getGroupVerticesList(), - convertInputDescriptorFromProto(groupInputSpecProto.getMergedInputDescriptor())); - return groupSpec; - } - - private static GroupInputSpecProto convertGroupInputSpecToProto(GroupInputSpec groupInputSpec) { - GroupInputSpecProto.Builder builder = GroupInputSpecProto.newBuilder(); - builder.setGroupName(groupInputSpec.getGroupName()); - builder.addAllGroupVertices(groupInputSpec.getGroupVertices()); - builder.setMergedInputDescriptor(convertToProto(groupInputSpec.getMergedInputDescriptor())); - return builder.build(); - } - - - private static void setUserPayload(EntityDescriptor<?> entity, UserPayload payload) { - if (payload != null) { - entity.setUserPayload(payload); - } - } - - private static UserPayload convertPayloadFromProto( - EntityDescriptorProto proto) { - UserPayload userPayload = null; - if (proto.hasUserPayload()) { - if (proto.getUserPayload().hasUserPayload()) { - userPayload = - UserPayload.create(proto.getUserPayload().getUserPayload().asReadOnlyByteBuffer(), proto.getUserPayload().getVersion()); - } else { - userPayload = UserPayload.create(null); - } - } - return userPayload; - } - - public static SourceStateProto fromVertexState(VertexState state) { - switch (state) { - case SUCCEEDED: - return SourceStateProto.S_SUCCEEDED; - case RUNNING: - return SourceStateProto.S_RUNNING; - default: - throw new RuntimeException("Unexpected state: " + state); - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java deleted file mode 100644 index 07703a2..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.tezplugins; - -import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; -import org.apache.tez.serviceplugins.api.ContainerLauncher; -import org.apache.tez.serviceplugins.api.ContainerLauncherContext; -import org.apache.tez.serviceplugins.api.ContainerStopRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class LlapContainerLauncher extends ContainerLauncher { - private static final Logger LOG = LoggerFactory.getLogger(LlapContainerLauncher.class); - - public LlapContainerLauncher(ContainerLauncherContext containerLauncherContext) { - super(containerLauncherContext); - } - - @Override - public void launchContainer(ContainerLaunchRequest containerLaunchRequest) { - LOG.info("No-op launch for container: " + containerLaunchRequest.getContainerId() + - " succeeded on host: " + containerLaunchRequest.getNodeId()); - getContext().containerLaunched(containerLaunchRequest.getContainerId()); - } - - @Override - public void stopContainer(ContainerStopRequest containerStopRequest) { - LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + containerStopRequest); - getContext().containerStopRequested(containerStopRequest.getContainerId()); - } -} \ No newline at end of file