http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java new file mode 100644 index 0000000..bf71861 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.restclient; + +import com.google.common.base.Preconditions; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.slider.core.exceptions.ExceptionConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; + +/** + * Class to bond to a Jersey client, for UGI integration and SPNEGO. + * <p> + * Usage: create an instance, then when creating a Jersey <code>Client</code> + * pass in to the constructor the handler provided by {@link #getHandler()} + * + * see <a href="https://jersey.java.net/apidocs/1.17/jersey/com/sun/jersey/client/urlconnection/HttpURLConnectionFactory.html">Jersey docs</a> + */ +public class UgiJerseyBinding implements + HttpURLConnectionFactory { + private static final Logger log = + LoggerFactory.getLogger(UgiJerseyBinding.class); + + private final UrlConnectionOperations operations; + private final URLConnectionClientHandler handler; + + /** + * Construct an instance + * @param operations operations instance + */ + @SuppressWarnings("ThisEscapedInObjectConstruction") + public UgiJerseyBinding(UrlConnectionOperations operations) { + Preconditions.checkArgument(operations != null, "Null operations"); + this.operations = operations; + handler = new URLConnectionClientHandler(this); + } + + /** + * Create an instance off the configuration. The SPNEGO policy + * is derived from the current UGI settings. + * @param conf config + */ + public UgiJerseyBinding(Configuration conf) { + this(new UrlConnectionOperations(conf)); + } + + /** + * Get a URL connection. + * @param url URL to connect to + * @return the connection + * @throws IOException any problem. {@link AuthenticationException} + * errors are wrapped + */ + @Override + public HttpURLConnection getHttpURLConnection(URL url) throws IOException { + try { + // open a connection handling status codes and so redirections + // but as it opens a connection, it's less useful than you think. + + return operations.openConnection(url); + } catch (AuthenticationException e) { + throw new IOException(e); + } + } + + public UrlConnectionOperations getOperations() { + return operations; + } + + public URLConnectionClientHandler getHandler() { + return handler; + } + + /** + * Get the SPNEGO flag (as found in the operations instance + * @return the spnego policy + */ + public boolean isUseSpnego() { + return operations.isUseSpnego(); + } + + + /** + * Uprate error codes 400 and up into faults; + * <p> + * see {@link ExceptionConverter#convertJerseyException(String, String, UniformInterfaceException)} + */ + public static IOException uprateFaults(HttpVerb verb, String url, + UniformInterfaceException ex) + throws IOException { + return ExceptionConverter.convertJerseyException(verb.getVerb(), + url, ex); + } + + /** + * Create the standard Jersey client Config + * @return the recommended Jersey Client config + */ + public ClientConfig createJerseyClientConfig() { + ClientConfig clientConfig = new DefaultClientConfig(); + clientConfig.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, true); + return clientConfig; + } + + /** + * Create a jersey client bonded to this handler, using the + * supplied client config + * @param clientConfig client configuratin + * @return a new client instance to use + */ + public Client createJerseyClient(ClientConfig clientConfig) { + return new Client(getHandler(), clientConfig); + } + + /** + * Create a jersey client bonded to this handler, using the + * client config created with {@link #createJerseyClientConfig()} + * @return a new client instance to use + */ + public Client createJerseyClient() { + return createJerseyClient(createJerseyClientConfig()); + } + +} + +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java new file mode 100644 index 0000000..20ef198 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.restclient; + +import com.google.common.base.Preconditions; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.yarn.webapp.ForbiddenException; +import org.apache.hadoop.yarn.webapp.NotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; + +/** + * Operations on the JDK UrlConnection class. + * + */ +public class UrlConnectionOperations extends Configured { + private static final Logger log = + LoggerFactory.getLogger(UrlConnectionOperations.class); + + private SliderURLConnectionFactory connectionFactory; + + private boolean useSpnego = false; + + /** + * Create an instance off the configuration. The SPNEGO policy + * is derived from the current UGI settings. + * @param conf config + */ + public UrlConnectionOperations(Configuration conf) { + super(conf); + connectionFactory = SliderURLConnectionFactory.newInstance(conf); + if (UserGroupInformation.isSecurityEnabled()) { + log.debug("SPNEGO is enabled"); + setUseSpnego(true); + } + } + + + public boolean isUseSpnego() { + return useSpnego; + } + + public void setUseSpnego(boolean useSpnego) { + this.useSpnego = useSpnego; + } + + /** + * Opens a url with cache disabled, redirect handled in + * (JDK) implementation. + * + * @param url to open + * @return URLConnection + * @throws IOException + * @throws AuthenticationException authentication failure + */ + public HttpURLConnection openConnection(URL url) throws + IOException, + AuthenticationException { + Preconditions.checkArgument(url.getPort() != 0, "no port"); + return (HttpURLConnection) connectionFactory.openConnection(url, useSpnego); + } + + public HttpOperationResponse execGet(URL url) throws + IOException, + AuthenticationException { + return execHttpOperation(HttpVerb.GET, url, null, ""); + } + + public HttpOperationResponse execHttpOperation(HttpVerb verb, + URL url, + byte[] payload, + String contentType) + throws IOException, AuthenticationException { + HttpURLConnection conn = null; + HttpOperationResponse outcome = new HttpOperationResponse(); + int resultCode; + byte[] body = null; + log.debug("{} {} spnego={}", verb, url, useSpnego); + + boolean doOutput = verb.hasUploadBody(); + if (doOutput) { + Preconditions.checkArgument(payload !=null, + "Null payload on a verb which expects one"); + } + try { + conn = openConnection(url); + conn.setRequestMethod(verb.getVerb()); + conn.setDoOutput(doOutput); + if (doOutput) { + conn.setRequestProperty("Content-Type", contentType); + } + + // now do the connection + conn.connect(); + + if (doOutput) { + OutputStream output = conn.getOutputStream(); + IOUtils.write(payload, output); + output.close(); + } + + resultCode = conn.getResponseCode(); + outcome.lastModified = conn.getLastModified(); + outcome.contentType = conn.getContentType(); + outcome.headers = conn.getHeaderFields(); + InputStream stream = conn.getErrorStream(); + if (stream == null) { + stream = conn.getInputStream(); + } + if (stream != null) { + // read into a buffer. + body = IOUtils.toByteArray(stream); + } else { + // no body: + log.debug("No body in response"); + + } + } catch (SSLException e) { + throw e; + } catch (IOException e) { + throw NetUtils.wrapException(url.toString(), + url.getPort(), "localhost", 0, e); + + } catch (AuthenticationException e) { + throw new AuthenticationException("From " + url + ": " + e, e); + + } finally { + if (conn != null) { + conn.disconnect(); + } + } + uprateFaults(HttpVerb.GET, url.toString(), resultCode, "", body); + outcome.responseCode = resultCode; + outcome.data = body; + return outcome; + } + + /** + * Uprate error codes 400 and up into faults; + * 404 is converted to a {@link NotFoundException}, + * 401 to {@link ForbiddenException} + * + * @param verb HTTP Verb used + * @param url URL as string + * @param resultCode response from the request + * @param bodyAsString + *@param body optional body of the request @throws IOException if the result was considered a failure + */ + public static void uprateFaults(HttpVerb verb, String url, + int resultCode, String bodyAsString, byte[] body) + throws IOException { + + if (resultCode < 400) { + //success + return; + } + String msg = verb.toString() +" "+ url; + if (resultCode == 404) { + throw new NotFoundException(msg); + } + if (resultCode == 401) { + throw new ForbiddenException(msg); + } + // all other error codes + + // get a string respnse + if (bodyAsString == null) { + if (body != null && body.length > 0) { + bodyAsString = new String(body); + } else { + bodyAsString = ""; + } + } + String message = msg + + " failed with exit code " + resultCode + + ", body length " + bodyAsString.length() + + ":\n" + bodyAsString; + log.error(message); + throw new IOException(message); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java new file mode 100644 index 0000000..ca49888 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.zk; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.ConnectException; +import java.util.concurrent.atomic.AtomicBoolean; + +public class BlockingZKWatcher implements Watcher { + + protected static final Logger log = + LoggerFactory.getLogger(BlockingZKWatcher.class); + private final AtomicBoolean connectedFlag = new AtomicBoolean(false); + + @Override + public void process(WatchedEvent event) { + log.info("ZK binding callback received"); + connectedFlag.set(true); + synchronized (connectedFlag) { + try { + connectedFlag.notify(); + } catch (Exception e) { + log.warn("failed while waiting for notification", e); + } + } + } + + /** + * Wait for a flag to go true + * @param timeout timeout in millis + */ + + public void waitForZKConnection(int timeout) + throws InterruptedException, ConnectException { + synchronized (connectedFlag) { + if (!connectedFlag.get()) { + log.info("waiting for ZK event"); + //wait a bit + connectedFlag.wait(timeout); + } + } + if (!connectedFlag.get()) { + throw new ConnectException("Unable to connect to ZK quorum"); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java new file mode 100644 index 0000000..c8b3adb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.zk; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.service.AbstractService; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Reader; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + + +/** + * This is a version of the HBase ZK cluster cut out to be standalone. + * + * <i>Important: keep this Java6 language level for now</i> + */ +public class MiniZooKeeperCluster extends AbstractService { + private static final Logger LOG = LoggerFactory.getLogger( + MiniZooKeeperCluster.class); + + private static final int TICK_TIME = 2000; + private static final int CONNECTION_TIMEOUT = 30000; + public static final int MAX_CLIENT_CONNECTIONS = 1000; + + private boolean started; + + /** The default port. If zero, we use a random port. */ + private int defaultClientPort = 0; + + private int clientPort; + + private final List<NIOServerCnxnFactory> standaloneServerFactoryList; + private final List<ZooKeeperServer> zooKeeperServers; + private final List<Integer> clientPortList; + + private int activeZKServerIndex; + private int tickTime = 0; + private File baseDir; + private final int numZooKeeperServers; + private String zkQuorum = ""; + + public MiniZooKeeperCluster(int numZooKeeperServers) { + super("MiniZooKeeperCluster"); + this.numZooKeeperServers = numZooKeeperServers; + this.started = false; + activeZKServerIndex = -1; + zooKeeperServers = new ArrayList<ZooKeeperServer>(); + clientPortList = new ArrayList<Integer>(); + standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>(); + } + + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + public void setDefaultClientPort(int clientPort) { + if (clientPort <= 0) { + throw new IllegalArgumentException("Invalid default ZK client port: " + + clientPort); + } + this.defaultClientPort = clientPort; + } + + /** + * Selects a ZK client port. Returns the default port if specified. + * Otherwise, returns a random port. The random port is selected from the + * range between 49152 to 65535. These ports cannot be registered with IANA + * and are intended for dynamic allocation (see http://bit.ly/dynports). + */ + private int selectClientPort(Random r) { + if (defaultClientPort > 0) { + return defaultClientPort; + } + return 0xc000 + r.nextInt(0x3f00); + } + + public void setTickTime(int tickTime) { + this.tickTime = tickTime; + } + + public int getBackupZooKeeperServerNum() { + return zooKeeperServers.size() - 1; + } + + public int getZooKeeperServerNum() { + return zooKeeperServers.size(); + } + + // / XXX: From o.a.zk.t.ClientBase + private static void setupTestEnv() { + // during the tests we run with 100K prealloc in the logs. + // on windows systems prealloc of 64M was seen to take ~15seconds + // resulting in test failure (client timeout on first session). + // set env and directly in order to handle static init/gc issues + System.setProperty("zookeeper.preAllocSize", "100"); + FileTxnLog.setPreallocSize(100 * 1024); + } + + @Override + protected void serviceStart() throws Exception { + startup(); + } + + /** + * @param baseDir + * @param numZooKeeperServers + * @return ClientPort server bound to, -1 if there was a + * binding problem and we couldn't pick another port. + * @throws IOException + * @throws InterruptedException + */ + private int startup() throws IOException, + InterruptedException { + if (numZooKeeperServers <= 0) + return -1; + + setupTestEnv(); + started = true; + baseDir = File.createTempFile("zookeeper", ".dir"); + recreateDir(baseDir); + + StringBuilder quorumList = new StringBuilder(); + Random rnd = new Random(); + int tentativePort = selectClientPort(rnd); + + // running all the ZK servers + for (int i = 0; i < numZooKeeperServers; i++) { + File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile(); + recreateDir(dir); + int tickTimeToUse; + if (this.tickTime > 0) { + tickTimeToUse = this.tickTime; + } else { + tickTimeToUse = TICK_TIME; + } + ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse); + NIOServerCnxnFactory standaloneServerFactory; + while (true) { + try { + standaloneServerFactory = new NIOServerCnxnFactory(); + standaloneServerFactory.configure( + new InetSocketAddress(tentativePort), + MAX_CLIENT_CONNECTIONS + ); + } catch (BindException e) { + LOG.debug("Failed binding ZK Server to client port: " + + tentativePort, e); + // We're told to use some port but it's occupied, fail + if (defaultClientPort > 0) return -1; + // This port is already in use, try to use another. + tentativePort = selectClientPort(rnd); + continue; + } + break; + } + + // Start up this ZK server + standaloneServerFactory.startup(server); + if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for startup of standalone server"); + } + + // We have selected this port as a client port. + clientPortList.add(tentativePort); + standaloneServerFactoryList.add(standaloneServerFactory); + zooKeeperServers.add(server); + if (quorumList.length() > 0) { + quorumList.append(","); + } + quorumList.append("localhost:").append(tentativePort); + tentativePort++; //for the next server + } + + // set the first one to be active ZK; Others are backups + activeZKServerIndex = 0; + + clientPort = clientPortList.get(activeZKServerIndex); + zkQuorum = quorumList.toString(); + LOG.info("Started MiniZK Cluster and connect 1 ZK server " + + "on client port: " + clientPort); + return clientPort; + } + + private void recreateDir(File dir) throws IOException { + if (dir.exists()) { + if (!FileUtil.fullyDelete(dir)) { + throw new IOException("Could not delete zk base directory: " + dir); + } + } + try { + dir.mkdirs(); + } catch (SecurityException e) { + throw new IOException("creating dir: " + dir, e); + } + } + + /** + * Delete the basedir + */ + private void deleteBaseDir() { + if (baseDir != null) { + baseDir.delete(); + baseDir = null; + } + + } + + @Override + protected void serviceStop() throws Exception { + + if (!started) { + return; + } + started = false; + + try { + // shut down all the zk servers + for (int i = 0; i < standaloneServerFactoryList.size(); i++) { + NIOServerCnxnFactory standaloneServerFactory = + standaloneServerFactoryList.get(i); + int clientPort = clientPortList.get(i); + + standaloneServerFactory.shutdown(); + if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for shutdown of standalone server"); + } + } + for (ZooKeeperServer zkServer : zooKeeperServers) { + //explicitly close ZKDatabase since ZookeeperServer does not close them + zkServer.getZKDatabase().close(); + } + } finally { + // clear everything + activeZKServerIndex = 0; + standaloneServerFactoryList.clear(); + clientPortList.clear(); + zooKeeperServers.clear(); + } + + LOG.info("Shutdown MiniZK cluster with all ZK servers"); + } + + /**@return clientPort return clientPort if there is another ZK backup can run + * when killing the current active; return -1, if there is no backups. + * @throws IOException + * @throws InterruptedException + */ + public int killCurrentActiveZooKeeperServer() throws IOException, + InterruptedException { + if (!started || activeZKServerIndex < 0) { + return -1; + } + + // Shutdown the current active one + NIOServerCnxnFactory standaloneServerFactory = + standaloneServerFactoryList.get(activeZKServerIndex); + int clientPort = clientPortList.get(activeZKServerIndex); + + standaloneServerFactory.shutdown(); + if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for shutdown of standalone server"); + } + + zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close(); + + // remove the current active zk server + standaloneServerFactoryList.remove(activeZKServerIndex); + clientPortList.remove(activeZKServerIndex); + zooKeeperServers.remove(activeZKServerIndex); + LOG.info("Kill the current active ZK servers in the cluster " + + "on client port: " + clientPort); + + if (standaloneServerFactoryList.size() == 0) { + // there is no backup servers; + return -1; + } + clientPort = clientPortList.get(activeZKServerIndex); + LOG.info("Activate a backup zk server in the cluster " + + "on client port: " + clientPort); + // return the next back zk server's port + return clientPort; + } + + /** + * Kill one back up ZK servers + * @throws IOException + * @throws InterruptedException + */ + public void killOneBackupZooKeeperServer() throws IOException, + InterruptedException { + if (!started || activeZKServerIndex < 0 || + standaloneServerFactoryList.size() <= 1) { + return; + } + + int backupZKServerIndex = activeZKServerIndex + 1; + // Shutdown the current active one + NIOServerCnxnFactory standaloneServerFactory = + standaloneServerFactoryList.get(backupZKServerIndex); + int clientPort = clientPortList.get(backupZKServerIndex); + + standaloneServerFactory.shutdown(); + if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for shutdown of standalone server"); + } + + zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close(); + + // remove this backup zk server + standaloneServerFactoryList.remove(backupZKServerIndex); + clientPortList.remove(backupZKServerIndex); + zooKeeperServers.remove(backupZKServerIndex); + LOG.info("Kill one backup ZK servers in the cluster " + + "on client port: " + clientPort); + } + + // XXX: From o.a.zk.t.ClientBase + private static boolean waitForServerDown(int port, long timeout) throws + InterruptedException { + long start = System.currentTimeMillis(); + while (true) { + try { + Socket sock = null; + try { + sock = new Socket("localhost", port); + OutputStream outstream = sock.getOutputStream(); + outstream.write("stat".getBytes()); + outstream.flush(); + } finally { + IOUtils.closeSocket(sock); + } + } catch (IOException e) { + return true; + } + + if (System.currentTimeMillis() > start + timeout) { + break; + } + Thread.sleep(250); + } + return false; + } + + // XXX: From o.a.zk.t.ClientBase + private static boolean waitForServerUp(int port, long timeout) throws + InterruptedException { + long start = System.currentTimeMillis(); + while (true) { + try { + Socket sock = null; + sock = new Socket("localhost", port); + BufferedReader reader = null; + try { + OutputStream outstream = sock.getOutputStream(); + outstream.write("stat".getBytes()); + outstream.flush(); + + Reader isr = new InputStreamReader(sock.getInputStream()); + reader = new BufferedReader(isr); + String line = reader.readLine(); + if (line != null && line.startsWith("Zookeeper version:")) { + return true; + } + } finally { + IOUtils.closeSocket(sock); + IOUtils.closeStream(reader); + } + } catch (IOException e) { + // ignore as this is expected + LOG.debug("server localhost:" + port + " not up " + e); + } + + if (System.currentTimeMillis() > start + timeout) { + break; + } + Thread.sleep(250); + } + return false; + } + + public int getClientPort() { + return clientPort; + } + + public String getZkQuorum() { + return zkQuorum; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java new file mode 100644 index 0000000..045b72c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.zk; + +import org.apache.zookeeper.Watcher; + +/** + * Relays ZK watcher events to a closure + */ +public abstract class ZKCallback implements Watcher { + + public ZKCallback() { + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java new file mode 100644 index 0000000..ca41e4b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.zk; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + + +public class ZKIntegration implements Watcher, Closeable { + +/** + * Base path for services + */ + public static String ZK_SERVICES = "services"; + /** + * Base path for all Slider references + */ + public static String ZK_SLIDER = "slider"; + public static String ZK_USERS = "users"; + public static String SVC_SLIDER = "/" + ZK_SERVICES + "/" + ZK_SLIDER; + public static String SVC_SLIDER_USERS = SVC_SLIDER + "/" + ZK_USERS; + + public static final List<String> ZK_USERS_PATH_LIST = new ArrayList<String>(); + static { + ZK_USERS_PATH_LIST.add(ZK_SERVICES); + ZK_USERS_PATH_LIST.add(ZK_SLIDER); + ZK_USERS_PATH_LIST.add(ZK_USERS); + } + + public static int SESSION_TIMEOUT = 30000; + protected static final Logger log = + LoggerFactory.getLogger(ZKIntegration.class); + private ZooKeeper zookeeper; + private final String username; + private final String clustername; + private final String userPath; + private int sessionTimeout = SESSION_TIMEOUT; +/** + flag to set to indicate that the user path should be created if + it is not already there + */ + private final AtomicBoolean toInit = new AtomicBoolean(false); + private final boolean createClusterPath; + private final Watcher watchEventHandler; + private final String zkConnection; + private final boolean canBeReadOnly; + + protected ZKIntegration(String zkConnection, + String username, + String clustername, + boolean canBeReadOnly, + boolean createClusterPath, + Watcher watchEventHandler, + int sessionTimeout + ) throws IOException { + this.username = username; + this.clustername = clustername; + this.watchEventHandler = watchEventHandler; + this.zkConnection = zkConnection; + this.canBeReadOnly = canBeReadOnly; + this.createClusterPath = createClusterPath; + this.sessionTimeout = sessionTimeout; + this.userPath = mkSliderUserPath(username); + } + + public void init() throws IOException { + assert zookeeper == null; + log.debug("Binding ZK client to {}", zkConnection); + zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this, canBeReadOnly); + } + + /** + * Create an instance bonded to the specific closure + * @param zkConnection + * @param username + * @param clustername + * @param canBeReadOnly + * @param watchEventHandler + * @return the new instance + * @throws IOException + */ + public static ZKIntegration newInstance(String zkConnection, + String username, + String clustername, + boolean createClusterPath, + boolean canBeReadOnly, + Watcher watchEventHandler, + int sessionTimeout) throws IOException { + + return new ZKIntegration(zkConnection, + username, + clustername, + canBeReadOnly, + createClusterPath, + watchEventHandler, + sessionTimeout); + } + + + @Override + public synchronized void close() throws IOException { + if (zookeeper != null) { + try { + zookeeper.close(); + } catch (InterruptedException ignored) { + + } + zookeeper = null; + } + } + + public String getConnectionString() { + return zkConnection; + } + + public String getClusterPath() { + return mkClusterPath(username, clustername); + } + + public boolean getConnected() { + return zookeeper.getState().isConnected(); + } + + public boolean getAlive() { + return zookeeper.getState().isAlive(); + } + + public ZooKeeper.States getState() { + return zookeeper.getState(); + } + + public Stat getClusterStat() throws KeeperException, InterruptedException { + return stat(getClusterPath()); + } + + public boolean exists(String path) throws + KeeperException, + InterruptedException { + return stat(path) != null; + } + + public Stat stat(String path) throws KeeperException, InterruptedException { + return zookeeper.exists(path, false); + } + + @Override + public String toString() { + return "ZK integration bound @ " + zkConnection + ": " + zookeeper; + } + +/** + * Event handler to notify of state events + * @param event + */ + @Override + public void process(WatchedEvent event) { + log.debug("{}", event); + try { + maybeInit(); + } catch (Exception e) { + log.error("Failed to init", e); + } + if (watchEventHandler != null) { + watchEventHandler.process(event); + } + } + + private void maybeInit() throws KeeperException, InterruptedException { + if (!toInit.getAndSet(true) && createClusterPath) { + log.debug("initing"); + //create the user path + mkPath(ZK_USERS_PATH_LIST, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + //create the specific user + createPath(userPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } + + /** + * Create a path under a parent, don't care if it already exists + * As the path isn't returned, this isn't the way to create sequentially + * numbered nodes. + * @param parent parent dir. Must have a trailing / if entry!=null||empty + * @param entry entry -can be null or "", in which case it is not appended + * @param acl + * @param createMode + * @return the path if created; null if not + */ + public String createPath(String parent, + String entry, + List<ACL> acl, + CreateMode createMode) throws KeeperException, InterruptedException { + //initial create of full path + assert acl != null; + assert !acl.isEmpty(); + assert parent != null; + String path = parent; + if (entry != null) { + path = path + entry; + } + try { + log.debug("Creating ZK path {}", path); + return zookeeper.create(path, null, acl, createMode); + } catch (KeeperException.NodeExistsException ignored) { + //node already there + log.debug("node already present:{}",path); + return null; + } + } + + /** + * Recursive path create + * @param paths path list + * @param acl acl list + * @param createMode create modes + */ + public void mkPath(List<String> paths, + List<ACL> acl, + CreateMode createMode) throws KeeperException, InterruptedException { + String history = "/"; + for (String entry : paths) { + createPath(history, entry, acl, createMode); + history = history + entry + "/"; + } + } + +/** + * Blocking enum of users + * @return an unordered list of clusters under a user + */ + public List<String> getClusters() throws KeeperException, InterruptedException { + return zookeeper.getChildren(userPath, null); + } + + /** + * Delete a node, does not throw an exception if the path is not fond + * @param path path to delete + * @return true if the path could be deleted, false if there was no node to delete + * + */ + public boolean delete(String path) throws + InterruptedException, + KeeperException { + try { + zookeeper.delete(path, -1); + log.debug("Deleting {}", path); + return true; + } catch (KeeperException.NoNodeException ignored) { + return false; + } + } + + /** + * Recursively delete a node, does not throw exception if any node does not exist. + * @param path + * @return true if delete was successful + */ + public boolean deleteRecursive(String path) throws KeeperException, InterruptedException { + + try { + List<String> children = zookeeper.getChildren(path, false); + for (String child : children) { + deleteRecursive(path + "/" + child); + } + delete(path); + } catch (KeeperException.NoNodeException ignored) { + return false; + } + + return true; + } + + /** + * Build the path to a cluster; exists once the cluster has come up. + * Even before that, a ZK watcher could wait for it. + * @param username user + * @param clustername name of the cluster + * @return a strin + */ + public static String mkClusterPath(String username, String clustername) { + return mkSliderUserPath(username) + "/" + clustername; + } +/** + * Build the path to a cluster; exists once the cluster has come up. + * Even before that, a ZK watcher could wait for it. + * @param username user + * @return a string + */ + public static String mkSliderUserPath(String username) { + return SVC_SLIDER_USERS + "/" + username; + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java new file mode 100644 index 0000000..b088568 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.zk; + +import java.util.Locale; + +public final class ZKPathBuilder { + + private final String username, appname, clustername; + private final String quorum; + + private String appPath; + private String registryPath; + private final String appQuorum; + + public ZKPathBuilder(String username, + String appname, + String clustername, + String quorum, + String appQuorum) { + this.username = username; + this.appname = appname; + this.clustername = clustername; + this.quorum = quorum; + appPath = buildAppPath(); + registryPath = buildRegistryPath(); + this.appQuorum = appQuorum; + } + + public String buildAppPath() { + return String.format(Locale.ENGLISH, "/yarnapps_%s_%s_%s", appname, + username, clustername); + + } + + public String buildRegistryPath() { + return String.format(Locale.ENGLISH, "/services_%s_%s_%s", appname, + username, clustername); + + } + + public String getQuorum() { + return quorum; + } + + public String getAppQuorum() { + return appQuorum; + } + + public String getAppPath() { + return appPath; + } + + public void setAppPath(String appPath) { + this.appPath = appPath; + } + + public String getRegistryPath() { + return registryPath; + } + + public void setRegistryPath(String registryPath) { + this.registryPath = registryPath; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java new file mode 100644 index 0000000..cc1b2c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.zk; + +import com.google.common.net.HostAndPort; +import org.apache.hadoop.util.StringUtils; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.core.exceptions.BadConfigException; + +import java.util.ArrayList; +import java.util.List; + +public class ZookeeperUtils { + public static final int DEFAULT_PORT = 2181; + + public static String buildConnectionString(String zkHosts, int port) { + String zkPort = Integer.toString(port); + //parse the hosts + String[] hostlist = zkHosts.split(",", 0); + String quorum = SliderUtils.join(hostlist, ":" + zkPort + ",", false); + return quorum; + } + + /** + * Take a quorum list and split it to (trimmed) pairs + * @param hostPortQuorumList list of form h1:port, h2:port2,... + * @return a possibly empty list of values between commas. They may not be + * valid hostname:port pairs + */ + public static List<String> splitToPairs(String hostPortQuorumList) { + // split an address hot + String[] strings = StringUtils.getStrings(hostPortQuorumList); + int len = 0; + if (strings != null) { + len = strings.length; + } + List<String> tuples = new ArrayList<String>(len); + if (strings != null) { + for (String s : strings) { + tuples.add(s.trim()); + } + } + return tuples; + } + + /** + * Split a quorum list into a list of hostnames and ports + * @param hostPortQuorumList split to a list of hosts and ports + * @return a list of values + */ + public static List<HostAndPort> splitToHostsAndPorts(String hostPortQuorumList) { + // split an address hot + String[] strings = StringUtils.getStrings(hostPortQuorumList); + int len = 0; + if (strings != null) { + len = strings.length; + } + List<HostAndPort> list = new ArrayList<HostAndPort>(len); + if (strings != null) { + for (String s : strings) { + list.add(HostAndPort.fromString(s.trim()).withDefaultPort(DEFAULT_PORT)); + } + } + return list; + } + + /** + * Build up to a hosts only list + * @param hostAndPorts + * @return a list of the hosts only + */ + public static String buildHostsOnlyList(List<HostAndPort> hostAndPorts) { + StringBuilder sb = new StringBuilder(); + for (HostAndPort hostAndPort : hostAndPorts) { + sb.append(hostAndPort.getHostText()).append(","); + } + if (sb.length() > 0) { + sb.delete(sb.length() - 1, sb.length()); + } + return sb.toString(); + } + + public static String buildQuorumEntry(HostAndPort hostAndPort, + int defaultPort) { + String s = hostAndPort.toString(); + if (hostAndPort.hasPort()) { + return s; + } else { + return s + ":" + defaultPort; + } + } + + /** + * Build a quorum list, injecting a ":defaultPort" ref if needed on + * any entry without one + * @param hostAndPorts + * @param defaultPort + * @return + */ + public static String buildQuorum(List<HostAndPort> hostAndPorts, int defaultPort) { + List<String> entries = new ArrayList<String>(hostAndPorts.size()); + for (HostAndPort hostAndPort : hostAndPorts) { + entries.add(buildQuorumEntry(hostAndPort, defaultPort)); + } + return SliderUtils.join(entries, ",", false); + } + + public static String convertToHostsOnlyList(String quorum) throws + BadConfigException { + List<HostAndPort> hostAndPorts = splitToHostsAndPortsStrictly(quorum); + return ZookeeperUtils.buildHostsOnlyList(hostAndPorts); + } + + public static List<HostAndPort> splitToHostsAndPortsStrictly(String quorum) throws + BadConfigException { + List<HostAndPort> hostAndPorts = + ZookeeperUtils.splitToHostsAndPorts(quorum); + if (hostAndPorts.isEmpty()) { + throw new BadConfigException("empty zookeeper quorum"); + } + return hostAndPorts; + } + + public static int getFirstPort(String quorum, int defVal) throws + BadConfigException { + List<HostAndPort> hostAndPorts = splitToHostsAndPortsStrictly(quorum); + int port = hostAndPorts.get(0).getPortOrDefault(defVal); + return port; + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java new file mode 100644 index 0000000..510de5d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.core.conf.ConfTreeOperations; +import org.apache.slider.core.conf.MapOperations; +import org.apache.slider.core.exceptions.BadClusterStateException; +import org.apache.slider.core.exceptions.SliderException; +import org.apache.slider.core.launch.AbstractLauncher; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import static org.apache.slider.api.ResourceKeys.COMPONENT_INSTANCES; +import static org.apache.slider.api.ResourceKeys.DEF_YARN_CORES; +import static org.apache.slider.api.ResourceKeys.DEF_YARN_MEMORY; +import static org.apache.slider.api.ResourceKeys.YARN_CORES; +import static org.apache.slider.api.ResourceKeys.YARN_MEMORY; + +public abstract class AbstractClientProvider extends Configured { + private static final Logger log = + LoggerFactory.getLogger(AbstractClientProvider.class); + protected static final ProviderUtils providerUtils = + new ProviderUtils(log); + + public static final String PROVIDER_RESOURCE_BASE = + "org/apache/slider/providers/"; + public static final String PROVIDER_RESOURCE_BASE_ROOT = + "/" + PROVIDER_RESOURCE_BASE; + + public AbstractClientProvider(Configuration conf) { + super(conf); + } + + public abstract String getName(); + + public abstract List<ProviderRole> getRoles(); + + /** + * Verify that an instance definition is considered valid by the provider + * @param instanceDefinition instance definition + * @throws SliderException if the configuration is not valid + */ + public void validateInstanceDefinition(AggregateConf instanceDefinition, SliderFileSystem fs) throws + SliderException { + + List<ProviderRole> roles = getRoles(); + ConfTreeOperations resources = + instanceDefinition.getResourceOperations(); + for (ProviderRole role : roles) { + String name = role.name; + MapOperations component = resources.getComponent(role.group); + if (component != null) { + String instances = component.get(COMPONENT_INSTANCES); + if (instances == null) { + String message = "No instance count provided for " + name; + log.error("{} with \n{}", message, resources.toString()); + throw new BadClusterStateException(message); + } + String ram = component.get(YARN_MEMORY); + String cores = component.get(YARN_CORES); + + + providerUtils.getRoleResourceRequirement(ram, + DEF_YARN_MEMORY, + Integer.MAX_VALUE); + providerUtils.getRoleResourceRequirement(cores, + DEF_YARN_CORES, + Integer.MAX_VALUE); + } + } + } + + + /** + * Any provider-side alteration of a configuration can take place here. + * @param aggregateConf config to patch + * @throws IOException IO problems + * @throws SliderException Slider-specific issues + */ + public void prepareInstanceConfiguration(AggregateConf aggregateConf) throws + SliderException, + IOException { + //default: do nothing + } + + + /** + * Prepare the AM settings for launch + * @param fileSystem filesystem + * @param serviceConf configuration of the client + * @param launcher launcher to set up + * @param instanceDescription instance description being launched + * @param snapshotConfDirPath + * @param generatedConfDirPath + * @param clientConfExtras + * @param libdir + * @param tempPath + * @param miniClusterTestRun flag set to true on a mini cluster run + * @throws IOException + * @throws SliderException + */ + public void prepareAMAndConfigForLaunch(SliderFileSystem fileSystem, + Configuration serviceConf, + AbstractLauncher launcher, + AggregateConf instanceDescription, + Path snapshotConfDirPath, + Path generatedConfDirPath, + Configuration clientConfExtras, + String libdir, + Path tempPath, + boolean miniClusterTestRun) + throws IOException, SliderException { + + } + + /** + * Load in and merge in templates. Null arguments means "no such template" + * @param instanceConf instance to patch + * @param internalTemplate patch to internal.json + * @param resourceTemplate path to resources.json + * @param appConfTemplate path to app_conf.json + * @throws IOException any IO problems + */ + protected void mergeTemplates(AggregateConf instanceConf, + String internalTemplate, + String resourceTemplate, + String appConfTemplate) throws IOException { + if (internalTemplate != null) { + ConfTreeOperations template = + ConfTreeOperations.fromResource(internalTemplate); + instanceConf.getInternalOperations() + .mergeWithoutOverwrite(template.confTree); + } + + if (resourceTemplate != null) { + ConfTreeOperations resTemplate = + ConfTreeOperations.fromResource(resourceTemplate); + instanceConf.getResourceOperations() + .mergeWithoutOverwrite(resTemplate.confTree); + } + + if (appConfTemplate != null) { + ConfTreeOperations template = + ConfTreeOperations.fromResource(appConfTemplate); + instanceConf.getAppConfOperations() + .mergeWithoutOverwrite(template.confTree); + } + + } + + /** + * This is called pre-launch to validate that the cluster specification + * is valid. This can include checking that the security options + * are in the site files prior to launch, that there are no conflicting operations + * etc. + * + * This check is made prior to every launch of the cluster -so can + * pick up problems which manually edited cluster files have added, + * or from specification files from previous versions. + * + * The provider MUST NOT change the remote specification. This is + * purely a pre-launch validation of options. + * + * + * @param sliderFileSystem filesystem + * @param clustername name of the cluster + * @param configuration cluster configuration + * @param instanceDefinition cluster specification + * @param clusterDirPath directory of the cluster + * @param generatedConfDirPath path to place generated artifacts + * @param secure flag to indicate that the cluster is secure + * @throws SliderException on any validation issue + * @throws IOException on any IO problem + */ + public void preflightValidateClusterConfiguration(SliderFileSystem sliderFileSystem, + String clustername, + Configuration configuration, + AggregateConf instanceDefinition, + Path clusterDirPath, + Path generatedConfDirPath, + boolean secure) + throws SliderException, IOException { + validateInstanceDefinition(instanceDefinition, sliderFileSystem); + } + + /** + * Return a set of application specific string tags. + * @return the set of tags. + */ + public Set<String> getApplicationTags (SliderFileSystem fileSystem, + String appDef) throws SliderException { + return Collections.emptySet(); + } + + /** + * Process client operations for applications such as install, configure + * @param fileSystem + * @param registryOperations + * @param configuration + * @param operation + * @param clientInstallPath + * @param clientPackage + * @param clientConfig + * @param name + * @throws SliderException + */ + public void processClientOperation(SliderFileSystem fileSystem, + RegistryOperations registryOperations, + Configuration configuration, + String operation, + File clientInstallPath, + File clientPackage, + JSONObject clientConfig, + String name) + throws SliderException { + throw new SliderException("Provider does not support client operations."); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java new file mode 100644 index 0000000..61b2655 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; +import org.apache.hadoop.registry.client.types.AddressTypes; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.slider.api.ClusterDescription; +import org.apache.slider.common.SliderKeys; +import org.apache.slider.common.tools.ConfigHelper; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.core.exceptions.BadCommandArgumentsException; +import org.apache.slider.core.exceptions.SliderException; +import org.apache.slider.core.main.ExitCodeProvider; +import org.apache.slider.server.appmaster.actions.QueueAccess; +import org.apache.slider.server.appmaster.operations.AbstractRMOperation; +import org.apache.slider.server.appmaster.state.ContainerReleaseSelector; +import org.apache.slider.server.appmaster.state.MostRecentContainerReleaseSelector; +import org.apache.slider.server.appmaster.state.StateAccessForProviders; +import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations; +import org.apache.slider.server.services.workflow.ForkedProcessService; +import org.apache.slider.server.services.workflow.ServiceParent; +import org.apache.slider.server.services.workflow.WorkflowSequenceService; +import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +/** + * The base class for provider services. It lets the implementations + * add sequences of operations, and propagates service failures + * upstream + */ +public abstract class AbstractProviderService + extends WorkflowSequenceService + implements + ProviderCore, + SliderKeys, + ProviderService { + private static final Logger log = + LoggerFactory.getLogger(AbstractProviderService.class); + protected StateAccessForProviders amState; + protected AgentRestOperations restOps; + protected URL amWebAPI; + protected YarnRegistryViewForProviders yarnRegistry; + protected QueueAccess queueAccess; + + protected AbstractProviderService(String name) { + super(name); + setStopIfNoChildServicesAtStartup(false); + } + + @Override + public Configuration getConf() { + return getConfig(); + } + + public StateAccessForProviders getAmState() { + return amState; + } + + public QueueAccess getQueueAccess() { + return queueAccess; + } + + public void setAmState(StateAccessForProviders amState) { + this.amState = amState; + } + + @Override + public String getHumanName() { + return getName().toLowerCase(Locale.ENGLISH); + } + + @Override + public void bind(StateAccessForProviders stateAccessor, + QueueAccess queueAccess, + List<Container> liveContainers) { + this.amState = stateAccessor; + this.queueAccess = queueAccess; + } + + @Override + public void bindToYarnRegistry(YarnRegistryViewForProviders yarnRegistry) { + this.yarnRegistry = yarnRegistry; + } + + public YarnRegistryViewForProviders getYarnRegistry() { + return yarnRegistry; + } + + @Override + public AgentRestOperations getAgentRestOperations() { + return restOps; + } + + @Override + public void notifyContainerCompleted(ContainerId containerId) { + } + + public void setAgentRestOperations(AgentRestOperations agentRestOperations) { + this.restOps = agentRestOperations; + } + + /** + * Load a specific XML configuration file for the provider config + * @param confDir configuration directory + * @param siteXMLFilename provider-specific filename + * @return a configuration to be included in status + * @throws BadCommandArgumentsException argument problems + * @throws IOException IO problems + */ + protected Configuration loadProviderConfigurationInformation(File confDir, + String siteXMLFilename) + throws BadCommandArgumentsException, IOException { + Configuration siteConf; + File siteXML = new File(confDir, siteXMLFilename); + if (!siteXML.exists()) { + throw new BadCommandArgumentsException( + "Configuration directory %s doesn't contain %s - listing is %s", + confDir, siteXMLFilename, SliderUtils.listDir(confDir)); + } + + //now read it in + siteConf = ConfigHelper.loadConfFromFile(siteXML); + log.info("{} file is at {}", siteXMLFilename, siteXML); + log.info(ConfigHelper.dumpConfigToString(siteConf)); + return siteConf; + } + + /** + * No-op implementation of this method. + */ + @Override + public void initializeApplicationConfiguration( + AggregateConf instanceDefinition, SliderFileSystem fileSystem) + throws IOException, SliderException { + } + + /** + * No-op implementation of this method. + * + * {@inheritDoc} + */ + @Override + public void validateApplicationConfiguration(AggregateConf instance, + File confDir, + boolean secure) + throws IOException, SliderException { + + } + + /** + * Scan through the roles and see if it is supported. + * @param role role to look for + * @return true if the role is known about -and therefore + * that a launcher thread can be deployed to launch it + */ + @Override + public boolean isSupportedRole(String role) { + Collection<ProviderRole> roles = getRoles(); + for (ProviderRole providedRole : roles) { + if (providedRole.name.equals(role)) { + return true; + } + } + return false; + } + + /** + * override point to allow a process to start executing in this container + * @param instanceDefinition cluster description + * @param confDir configuration directory + * @param env environment + * @param execInProgress the callback for the exec events + * @return false + * @throws IOException + * @throws SliderException + */ + @Override + public boolean exec(AggregateConf instanceDefinition, + File confDir, + Map<String, String> env, + ProviderCompleted execInProgress) throws IOException, SliderException { + return false; + } + + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + @Override // ExitCodeProvider + public int getExitCode() { + Throwable cause = getFailureCause(); + if (cause != null) { + //failed for some reason + if (cause instanceof ExitCodeProvider) { + return ((ExitCodeProvider) cause).getExitCode(); + } + } + ForkedProcessService lastProc = latestProcess(); + if (lastProc == null || !lastProc.isProcessTerminated()) { + return 0; + } else { + return lastProc.getExitCode(); + } + } + + /** + * Return the latest forked process service that ran + * @return the forkes service + */ + protected ForkedProcessService latestProcess() { + Service current = getActiveService(); + Service prev = getPreviousService(); + + Service latest = current != null ? current : prev; + if (latest instanceof ForkedProcessService) { + return (ForkedProcessService) latest; + } else { + //its a composite object, so look inside it for a process + if (latest instanceof ServiceParent) { + return getFPSFromParentService((ServiceParent) latest); + } else { + //no match + return null; + } + } + } + + + /** + * Given a parent service, find the one that is a forked process + * @param serviceParent parent + * @return the forked process service or null if there is none + */ + protected ForkedProcessService getFPSFromParentService(ServiceParent serviceParent) { + List<Service> services = serviceParent.getServices(); + for (Service s : services) { + if (s instanceof ForkedProcessService) { + return (ForkedProcessService) s; + } + } + return null; + } + + /** + * if we are already running, start this service + */ + protected void maybeStartCommandSequence() { + if (isInState(STATE.STARTED)) { + startNextService(); + } + } + + /** + * Create a new forked process service with the given + * name, environment and command list -then add it as a child + * for execution in the sequence. + * + * @param name command name + * @param env environment + * @param commands command line + * @throws IOException + * @throws SliderException + */ + protected ForkedProcessService queueCommand(String name, + Map<String, String> env, + List<String> commands) throws + IOException, + SliderException { + ForkedProcessService process = buildProcess(name, env, commands); + //register the service for lifecycle management; when this service + //is terminated, so is the master process + addService(process); + return process; + } + + public ForkedProcessService buildProcess(String name, + Map<String, String> env, + List<String> commands) throws + IOException, + SliderException { + ForkedProcessService process; + process = new ForkedProcessService(name); + process.init(getConfig()); + process.build(env, commands); + return process; + } + + /* + * Build the provider status, can be empty + * @return the provider status - map of entries to add to the info section + */ + @Override + public Map<String, String> buildProviderStatus() { + return new HashMap<String, String>(); + } + + /* + Build the monitor details. The base implementation includes all the external URL endpoints + in the external view + */ + @Override + public Map<String, MonitorDetail> buildMonitorDetails(ClusterDescription clusterDesc) { + Map<String, MonitorDetail> details = new LinkedHashMap<String, MonitorDetail>(); + + // add in all the endpoints + buildEndpointDetails(details); + + return details; + } + + @Override + public void buildEndpointDetails(Map<String, MonitorDetail> details) { + ServiceRecord self = yarnRegistry.getSelfRegistration(); + + List<Endpoint> externals = self.external; + for (Endpoint endpoint : externals) { + String addressType = endpoint.addressType; + if (AddressTypes.ADDRESS_URI.equals(addressType)) { + try { + List<URL> urls = RegistryTypeUtils.retrieveAddressURLs(endpoint); + if (!urls.isEmpty()) { + details.put(endpoint.api, new MonitorDetail(urls.get(0).toString(), true)); + } + } catch (InvalidRecordException | MalformedURLException ignored) { + // Ignored + } + + } + + } + } + + @Override + public void applyInitialRegistryDefinitions(URL amWebURI, + URL agentOpsURI, + URL agentStatusURI, + ServiceRecord serviceRecord) + throws IOException { + this.amWebAPI = amWebURI; + } + + /** + * {@inheritDoc} + * + * + * @return The base implementation returns the most recent containers first. + */ + @Override + public ContainerReleaseSelector createContainerReleaseSelector() { + return new MostRecentContainerReleaseSelector(); + } + + @Override + public void releaseAssignedContainer(ContainerId containerId) { + // no-op + } + + @Override + public void addContainerRequest(AMRMClient.ContainerRequest req) { + // no-op + } + + @Override + public void cancelSingleRequest(AMRMClient.ContainerRequest request) { + // no-op + } + + @Override + public int cancelContainerRequests(Priority priority1, + Priority priority2, + int count) { + return 0; + } + + @Override + public void execute(List<AbstractRMOperation> operations) { + for (AbstractRMOperation operation : operations) { + operation.execute(this); + } + } + /** + * No-op implementation of this method. + */ + @Override + public void rebuildContainerDetails(List<Container> liveContainers, + String applicationId, Map<Integer, ProviderRole> providerRoles) { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java new file mode 100644 index 0000000..27d3415 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.slider.providers; + +/** + * Details about some exported information from a provider to the AM web UI. + */ +public class MonitorDetail { + + private final String value; + private final boolean isUrl; + + public MonitorDetail(String value, boolean isUrl) { + this.value = value; + this.isUrl = isUrl; + } + + public String getValue() { + return value; + } + + public boolean isUrl() { + return isUrl; + } + + public String toString() { + return "MonitorDetail[" + value + " isUrl=" + isUrl + "]"; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java new file mode 100644 index 0000000..128dd5d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers; + +/** + * Placement values. + * This is nominally a bitmask, though not all values make sense + */ +public class PlacementPolicy { + + /** + * Default value: history used, anti-affinity hinted at on rebuild/flex up + */ + public static final int NONE = 0; + + /** + * Default value: history used, anti-affinity hinted at on rebuild/flex up + */ + public static final int DEFAULT = NONE; + + /** + * Strict placement: when asking for an instance for which there is + * history, mandate that it is strict + */ + public static final int STRICT = 1; + + /** + * No data locality; do not use placement history + */ + public static final int ANYWHERE = 2; + + /** + * @Deprecated: use {@link #ANYWHERE} + */ + @Deprecated + public static final int NO_DATA_LOCALITY = ANYWHERE; + + /** + * Anti-affinity is mandatory. + */ + public static final int ANTI_AFFINITY_REQUIRED = 4; + + /** + * Exclude from flexing; used internally to mark AMs. + */ + public static final int EXCLUDE_FROM_FLEXING = 16; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicyOptions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicyOptions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicyOptions.java new file mode 100644 index 0000000..e61f944 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicyOptions.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers; + +public enum PlacementPolicyOptions { + + EXCLUDE_FROM_FLEXING, + NO_DATA_LOCALITY, + ANTI_AFFINITY_REQUIRED, +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java new file mode 100644 index 0000000..f6ff4fd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers; + +/** + * This is the callback triggered by the {@link ProviderCompletedCallable} + * when it generates a notification + */ +public interface ProviderCompleted { + + public void eventCallbackEvent(Object parameter); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java new file mode 100644 index 0000000..47939c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers; + +import java.util.concurrent.Callable; + +public class ProviderCompletedCallable implements Callable<Object> { + + private final ProviderCompleted callback; + private final Object parameter; + + public ProviderCompletedCallable(ProviderCompleted callback, Object parameter) { + this.callback = callback; + this.parameter = parameter; + } + + @Override + public Object call() throws Exception { + callback.eventCallbackEvent(parameter); + return parameter; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org