http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/NameValuePair.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/NameValuePair.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/NameValuePair.java new file mode 100644 index 0000000..5872ab9 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/NameValuePair.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.core; + +public class NameValuePair { + private String name; + private Object value; + + public NameValuePair(String name, Object value) { + this.name = name; + this.value = value; + } + + public String getName() { + return name; + } + + public Object getValue() { + return value; + } + + public String getQuotedValue() { + if (value == null) { + return "<unset>"; + } + if (value instanceof String) { + return "\"" + value + "\""; + } + return value.toString(); + } +}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnClientException.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnClientException.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnClientException.java new file mode 100644 index 0000000..62dd468 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnClientException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.core; + +public class YarnClientException extends Exception { + private static final long serialVersionUID = -1411110715738266578L; + + public YarnClientException(String msg) { + super(msg); + } + + public YarnClientException(String msg, Exception e) { + super(msg, e); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java new file mode 100644 index 0000000..8905ce3 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.core; + +import java.io.IOException; + +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; + +/** + * YARN resource manager client implementation for Drill. Provides a wrapper + * around the YARN client interface to the Resource Manager. Used by the client + * app to start the Drill application master. + * <p> + * Based on + * <a href="https://github.com/hortonworks/simple-yarn-app">simple-yarn-app</a> + */ + +public class YarnRMClient { + private YarnConfiguration conf; + private YarnClient yarnClient; + + /** + * Application ID. Semantics are such that each session of Drill-on-YARN works + * with no more than one application ID. + */ + + private ApplicationId appId; + private YarnClientApplication app; + + public YarnRMClient() { + this(new YarnConfiguration()); + } + + public YarnRMClient(ApplicationId appId) { + this(); + this.appId = appId; + } + + public YarnRMClient(YarnConfiguration conf) { + this.conf = conf; + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + } + + public GetNewApplicationResponse createAppMaster() + throws YarnClientException { + // Create application via yarnClient + // Response is a new application ID along with cluster capacity info + + try { + app = yarnClient.createApplication(); + } catch (YarnException | IOException e) { + throw new YarnClientException("Create application failed", e); + } + GetNewApplicationResponse response = app.getNewApplicationResponse(); + appId = response.getApplicationId(); + return response; + } + + public void submitAppMaster(AppSpec spec) throws YarnClientException { + if (app == null) { + throw new IllegalStateException("call createAppMaster( ) first"); + } + + ApplicationSubmissionContext appContext; + try { + appContext = spec.createAppLaunchContext(conf, app); + } catch (IOException e) { + throw new YarnClientException("Create app launch context failed", e); + } + + // Submit application + try { + yarnClient.submitApplication(appContext); + } catch (YarnException | IOException e) { + throw new YarnClientException("Submit application failed", e); + } + } + + public ApplicationId getAppId() { + return appId; + } + + public ApplicationReport getAppReport() throws YarnClientException { + try { + return yarnClient.getApplicationReport(appId); + } catch (YarnException | IOException e) { + throw new YarnClientException("Get application report failed", e); + } + } + + /** + * Waits for the application to start. This version is somewhat informal, the + * intended use is when debugging unmanaged applications. + * + * @throws YarnClientException + */ + public ApplicationAttemptId waitForStart() throws YarnClientException { + ApplicationReport appReport; + YarnApplicationState appState; + ApplicationAttemptId attemptId; + for (;;) { + appReport = getAppReport(); + appState = appReport.getYarnApplicationState(); + attemptId = appReport.getCurrentApplicationAttemptId(); + if (appState != YarnApplicationState.NEW + && appState != YarnApplicationState.NEW_SAVING + && appState != YarnApplicationState.SUBMITTED) { + break; + } + System.out.println("App State: " + appState); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Should never occur. + } + } + if (appState != YarnApplicationState.ACCEPTED) { + throw new YarnClientException( + "Application start failed with status " + appState); + } + + return attemptId; + } + + /** + * Wait for the application to enter one of the completion states. This is an + * informal implementation useful for testing. + * + * @throws YarnClientException + */ + + public void waitForCompletion() throws YarnClientException { + ApplicationReport appReport; + YarnApplicationState appState; + for (;;) { + appReport = getAppReport(); + appState = appReport.getYarnApplicationState(); + if (appState == YarnApplicationState.FINISHED + || appState == YarnApplicationState.KILLED + || appState == YarnApplicationState.FAILED) { + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // Should never occur. + } + } + + System.out.println("Application " + appId + " finished with" + " state " + + appState + " at " + appReport.getFinishTime()); + } + + public Token<AMRMTokenIdentifier> getAMRMToken() throws YarnClientException { + try { + return yarnClient.getAMRMToken(appId); + } catch (YarnException | IOException e) { + throw new YarnClientException("Get AM/RM token failed", e); + } + } + + /** + * Return standard class path entries from the YARN application class path. + */ + + public String[] getYarnAppClassPath() { + return conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH); + } + + public void killApplication() throws YarnClientException { + try { + yarnClient.killApplication(appId); + } catch (YarnException | IOException e) { + throw new YarnClientException( + "Kill failed for application: " + appId.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/package-info.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/package-info.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/package-info.java new file mode 100644 index 0000000..aaa0fff --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Holds functionality common to the Drill-on-YARN client and Application Master (AM). + * Includes configuration, utilities, and wrappers around various YARN data classes. + */ + +package org.apache.drill.yarn.core; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/package-info.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/package-info.java b/drill-yarn/src/main/java/org/apache/drill/yarn/package-info.java new file mode 100644 index 0000000..170dfa8 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/package-info.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Hosts Apache Drill under Apache Hadoop YARN. Consists of two main + * components as required by YARN: a client application which uses YARN to + * start the Drill cluster, and an Application Master (AM) which manages + * the cluster. The AM in turn starts, manages and stops drillbits. + * <p> + * Much of the functionality is simply plumbing to get YARN to do what is + * needed. The core of the AM is a "cluster controller" which starts, + * monitors and stops Drillbits, tracking their state transitions though + * the several lifecycle stages that result. + * <p> + * Note about logs here: Drill-on-YARN is a YARN application and so it + * uses the same logging system used by the YARN code. This is different + * than that used by Drill. + */ + +package org.apache.drill.yarn; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/AMRegistry.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/AMRegistry.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/AMRegistry.java new file mode 100644 index 0000000..9cc95e5 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/AMRegistry.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.zk; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException.NodeExistsException; + +/** + * Register this App Master in ZK to prevent duplicates. + * <p> + * Possible enhancement is to put the registry in some well-known location, such + * as /drill-am, + */ +public class AMRegistry { + private static final String AM_REGISTRY = "/drill-on-yarn"; + + private ZKClusterCoordinator zkCoord; + @SuppressWarnings("unused") + private String amHost; + @SuppressWarnings("unused") + private int amPort; + @SuppressWarnings("unused") + private String amAppId; + + private String zkRoot; + + private String clusterId; + + public AMRegistry(ZKClusterCoordinator zkCoord) { + this.zkCoord = zkCoord; + } + + public void useLocalRegistry(String zkRoot, String clusterId) { + this.zkRoot = zkRoot; + this.clusterId = clusterId; + } + + /** + * Register this AM as an ephemeral znode in ZK. The structure of ZK is as + * follows: + * + * <pre> + * /drill + * . <cluster-id> + * . . <Drillbit GUID> (Value is Proto-encoded drillbit info) + * . drill-on-yarn + * . . <cluster-id> (value: amHost:port) + * </pre> + * <p> + * The structure acknowledges that the cluster-id znode may be renamed, and + * there may be multiple cluster IDs for a single drill root node. (Odd, but + * supported.) To address this, we put the AM registrations in their own + * (persistent) znode: drill-on-yarn. Each is keyed by the cluster ID (so we + * can find it), and holds the host name, HTTP port and Application ID of the + * AM. + * <p> + * When the AM starts, it atomically checks and sets the AM registration. If + * another AM already is running, then this AM will fail, displaying a log + * error message with the host, port and (most importantly) app ID so the user + * can locate the problem. + * + * @throws ZKRuntimeException + */ + + public void register(String amHost, int amPort, String amAppId) + throws ZKRuntimeException { + this.amHost = amHost; + this.amPort = amPort; + this.amAppId = amAppId; + try { + + // The znode to hold AMs may or may not exist. Create it if missing. + + try { + zkCoord.getCurator().create().withMode(CreateMode.PERSISTENT) + .forPath(AM_REGISTRY, new byte[0]); + } catch (NodeExistsException e) { + // OK + } + + // Try to create the AM registration. + + String amPath = AM_REGISTRY + "/" + clusterId; + String content = amHost + ":" + Integer.toString(amPort) + ":" + amAppId; + try { + zkCoord.getCurator().create().withMode(CreateMode.EPHEMERAL) + .forPath(amPath, content.getBytes("UTF-8")); + } catch (NodeExistsException e) { + + // ZK says that a node exists, which means that another AM is already + // running. + // Display an error, handling the case where the AM just disappeared, + // the + // registration is badly formatted, etc. + + byte data[] = zkCoord.getCurator().getData().forPath(amPath); + String existing; + if (data == null) { + existing = "Unknown"; + } else { + String packed = new String(data, "UTF-8"); + String unpacked[] = packed.split(":"); + if (unpacked.length < 3) { + existing = packed; + } else { + existing = unpacked[0] + ", port: " + unpacked[1] + + ", Application ID: " + unpacked[2]; + } + } + + // Die with a clear (we hope!) error message. + + throw new ZKRuntimeException( + "FAILED! An Application Master already exists for " + zkRoot + "/" + + clusterId + " on host: " + existing); + } + } catch (ZKRuntimeException e) { + + // Something bad happened with ZK. + + throw e; + } catch (Exception e) { + + // Something bad happened with ZK. + + throw new ZKRuntimeException("Failed to create AM registration node", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinator.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinator.java new file mode 100644 index 0000000..7c5f5f3 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinator.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.zk; + +import static com.google.common.base.Throwables.propagate; +import static com.google.common.collect.Collections2.transform; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.x.discovery.ServiceCache; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.details.ServiceCacheListener; +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.DistributedSemaphore; +import org.apache.drill.exec.coord.DrillServiceInstanceHelper; +import org.apache.drill.exec.coord.store.CachingTransientStoreFactory; +import org.apache.drill.exec.coord.store.TransientStore; +import org.apache.drill.exec.coord.store.TransientStoreConfig; +import org.apache.drill.exec.coord.store.TransientStoreFactory; +import org.apache.drill.exec.coord.zk.ZKRegistrationHandle; +import org.apache.drill.exec.coord.zk.ZkDistributedSemaphore; +import org.apache.drill.exec.coord.zk.ZkEphemeralStore; +import org.apache.drill.exec.coord.zk.ZkTransientStoreFactory; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State; + +import com.google.common.base.Function; + +/** + * Manages cluster coordination utilizing zookeeper. + * <p> + * This is a clone of the Drill class + * org.apache.drill.exec.coord.zk.ZKClusterCoordinator with a number of + * modifications: + * <ul> + * <li>Removed dependency on the Drill config system. That system uses Google's + * Guava library version 18, which conflicts with the earlier versions used by + * YARN and Hadoop, which resulted in runtime undefined method exceptions.</li> + * <li>Instead of getting config information out of the Drill config, the + * parameters are instead passed directly.</li> + * <li>Adds support for the drillbits registered event which was neither needed + * nor implemented by Drill.</li> + * <li>Use the YARN logging system instead of Drill's.</li> + * </ul> + * <p> + * This class should be replaced by the Drill version if/when the Guava + * conflicts can be resolved (and when registered Drillbit notifications are + * added to the Drill version.) + */ + +public class ZKClusterCoordinator extends ClusterCoordinator { + + protected static final Log logger = LogFactory + .getLog(ZKClusterCoordinator.class); + + private CuratorFramework curator; + private ServiceDiscovery<DrillbitEndpoint> discovery; + private volatile Collection<DrillbitEndpoint> endpoints = Collections + .emptyList(); + private final String serviceName; + private final CountDownLatch initialConnection = new CountDownLatch(1); + private final TransientStoreFactory factory; + private ServiceCache<DrillbitEndpoint> serviceCache; + + public ZKClusterCoordinator(String connect, String zkRoot, String clusterId, + int retryCount, int retryDelayMs, int connectTimeoutMs) + throws IOException { + logger.debug("ZK connect: " + connect + ", zkRoot: " + zkRoot + + ", clusterId: " + clusterId); + + this.serviceName = clusterId; + RetryPolicy rp = new RetryNTimes(retryCount, retryDelayMs); + curator = CuratorFrameworkFactory.builder().namespace(zkRoot) + .connectionTimeoutMs(connectTimeoutMs).retryPolicy(rp) + .connectString(connect).build(); + curator.getConnectionStateListenable() + .addListener(new InitialConnectionListener()); + curator.start(); + discovery = newDiscovery(); + factory = CachingTransientStoreFactory + .of(new ZkTransientStoreFactory(curator)); + } + + public CuratorFramework getCurator() { + return curator; + } + + @Override + public void start(long millisToWait) throws Exception { + logger.debug("Starting ZKClusterCoordination."); + discovery.start(); + + if (millisToWait != 0) { + boolean success = this.initialConnection.await(millisToWait, + TimeUnit.MILLISECONDS); + if (!success) { + throw new IOException(String.format( + "Failure to connect to the zookeeper cluster service within the allotted time of %d milliseconds.", + millisToWait)); + } + } else { + this.initialConnection.await(); + } + + serviceCache = discovery.serviceCacheBuilder().name(serviceName).build(); + serviceCache.addListener(new EndpointListener()); + serviceCache.start(); + updateEndpoints(); + } + + private class InitialConnectionListener implements ConnectionStateListener { + + @Override + public void stateChanged(CuratorFramework client, + ConnectionState newState) { + if (newState == ConnectionState.CONNECTED) { + initialConnection.countDown(); + client.getConnectionStateListenable().removeListener(this); + } + } + + } + + private class EndpointListener implements ServiceCacheListener { + @Override + public void stateChanged(CuratorFramework client, + ConnectionState newState) { + } + + @Override + public void cacheChanged() { + logger.debug("Got cache changed --> updating endpoints"); + updateEndpoints(); + } + } + + @Override + public void close() throws Exception { + // discovery attempts to close its caches(ie serviceCache) already. however, + // being good citizens we make sure to + // explicitly close serviceCache. Not only that we make sure to close + // serviceCache before discovery to prevent + // double releasing and disallowing jvm to spit bothering warnings. simply + // put, we are great! + AutoCloseables.close(serviceCache, discovery, curator, factory); + } + + @Override + public RegistrationHandle register(DrillbitEndpoint data) { + try { + ServiceInstance<DrillbitEndpoint> serviceInstance = newServiceInstance( + data); + discovery.registerService(serviceInstance); + return new ZKRegistrationHandle(serviceInstance.getId(), data); + } catch (Exception e) { + throw propagate(e); + } + } + + @Override + public void unregister(RegistrationHandle handle) { + if (!(handle instanceof ZKRegistrationHandle)) { + throw new UnsupportedOperationException( + "Unknown handle type: " + handle.getClass().getName()); + } + + // when Drillbit is unregistered, clean all the listeners registered in CC. + this.listeners.clear(); + + ZKRegistrationHandle h = (ZKRegistrationHandle) handle; + try { + ServiceInstance<DrillbitEndpoint> serviceInstance = ServiceInstance + .<DrillbitEndpoint> builder().address("").port(0).id(h.id) + .name(serviceName).build(); + discovery.unregisterService(serviceInstance); + } catch (Exception e) { + propagate(e); + } + } + + @Override + public Collection<DrillbitEndpoint> getAvailableEndpoints() { + return this.endpoints; + } + + @Override + public DistributedSemaphore getSemaphore(String name, int maximumLeases) { + return new ZkDistributedSemaphore(curator, "/semaphore/" + name, + maximumLeases); + } + + @Override + public <V> TransientStore<V> getOrCreateTransientStore( + final TransientStoreConfig<V> config) { + final ZkEphemeralStore<V> store = (ZkEphemeralStore<V>) factory + .getOrCreateStore(config); + return store; + } + + private synchronized void updateEndpoints() { + try { + Collection<DrillbitEndpoint> newDrillbitSet = transform( + discovery.queryForInstances(serviceName), + new Function<ServiceInstance<DrillbitEndpoint>, DrillbitEndpoint>() { + @Override + public DrillbitEndpoint apply( + ServiceInstance<DrillbitEndpoint> input) { + return input.getPayload(); + } + }); + + // set of newly dead bits : original bits - new set of active bits. + Set<DrillbitEndpoint> unregisteredBits = new HashSet<>(endpoints); + unregisteredBits.removeAll(newDrillbitSet); + + // Set of newly live bits : new set of active bits - original bits. + Set<DrillbitEndpoint> registeredBits = new HashSet<>(newDrillbitSet); + registeredBits.removeAll(endpoints); + + endpoints = newDrillbitSet; + + if (logger.isDebugEnabled()) { + StringBuilder builder = new StringBuilder(); + builder.append("Active drillbit set changed. Now includes "); + builder.append(newDrillbitSet.size()); + builder.append(" total bits."); + if (!newDrillbitSet.isEmpty()) { + builder.append(" New active drillbits: \n"); + } + for (DrillbitEndpoint bit : newDrillbitSet) { + builder.append('\t'); + builder.append(bit.getAddress()); + builder.append(':'); + builder.append(bit.getUserPort()); + builder.append(':'); + builder.append(bit.getControlPort()); + builder.append(':'); + builder.append(bit.getDataPort()); + builder.append('\n'); + } + logger.debug(builder.toString()); + } + + // Notify the drillbit listener for newly unregistered bits. + if (!(unregisteredBits.isEmpty())) { + drillbitUnregistered(unregisteredBits); + } + // Notify the drillbit listener for newly registered bits. + if (!(registeredBits.isEmpty())) { + drillbitRegistered(registeredBits); + } + + } catch (Exception e) { + logger.error("Failure while update Drillbit service location cache.", e); + } + } + + protected ServiceInstance<DrillbitEndpoint> newServiceInstance( + DrillbitEndpoint endpoint) throws Exception { + return ServiceInstance.<DrillbitEndpoint> builder().name(serviceName) + .payload(endpoint).build(); + } + + protected ServiceDiscovery<DrillbitEndpoint> newDiscovery() { + return ServiceDiscoveryBuilder.builder(DrillbitEndpoint.class).basePath("/") + .client(curator).serializer(DrillServiceInstanceHelper.SERIALIZER) + .build(); + } + + @Override + public Collection<DrillbitEndpoint> getOnlineEndPoints() { + + // Not used in DoY + + throw new UnsupportedOperationException(); + } + + @Override + public RegistrationHandle update(RegistrationHandle handle, State state) { + + // Not used in DoY + + throw new UnsupportedOperationException(); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinatorDriver.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinatorDriver.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinatorDriver.java new file mode 100644 index 0000000..3f83ff2 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinatorDriver.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.zk; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.work.foreman.DrillbitStatusListener; +import org.apache.drill.yarn.appMaster.AMRegistrar; + +/** + * Driver class for the ZooKeeper cluster coordinator. Provides defaults for + * most options, but allows customizing each. Provides a {@link #build()} method + * to create <i>and start</i> the ZK service. Obtains the initial set of + * Drillbits (which should be empty for a YARN-defined cluster) which can be + * retrieved after building. + * <p> + * Maintains the ZK connection and monitors for disconnect. This class simply + * detects a disconnect timeout, it does not send a disconnect event itself to + * avoid creating a timer thread just for this purpose. Instead, the caller can + * poll {@link #hasFailed()}. + * <p> + * Defaults match those in Drill. (Actual Drill defaults are not yet used due to + * code incompatibility issues.) + */ + +public class ZKClusterCoordinatorDriver implements AMRegistrar { + private static final Pattern ZK_COMPLEX_STRING = Pattern + .compile("(^.*?)/(.*)/([^/]*)$"); + + // Defaults are taken from java-exec's drill-module.conf + + private String connect = "localhost:2181"; + private String clusterId = "drillbits1"; + private String zkRoot = "drill"; + private int retryCount = 7200; + private int connectTimeoutMs = 5_000; + private int retryDelayMs = 500; + + // Default timeout before we declare that ZK is down: 2 minutes. + + private int failureTimeoutMs = 120_000; + + // Maximum ZK startup wait defaults to 30 seconds. It is only 10 seconds + // in the Drill implementation. + + private int maxStartWaitMs = 30_000; + + // Expected ports used to match ZK registries with + // containers. ZK lists the ports as part of its key, we have to anticipate + // these values in order to match. + + private int userPort = 31010; + private int controlPort = 31011; + private int dataPort = 31012; + + private List<DrillbitEndpoint> initialEndpoints; + private ConnectionStateListener stateListener = new ConnectionStateListener() { + + @Override + public void stateChanged(CuratorFramework client, + ConnectionState newState) { + ZKClusterCoordinatorDriver.this.stateChanged(newState); + } + }; + + private ZKClusterCoordinator zkCoord; + + private long connectionLostTime; + + private AMRegistry amRegistry; + + public ZKClusterCoordinatorDriver() { + } + + /** + * Specify connect string in the form: host:/zkRoot/clusterId + * + * @param connect + * @return + * @throws ZKConfigException + */ + public ZKClusterCoordinatorDriver setConnect(String connect) + throws ZKConfigException { + + // check if this is a complex zk string. If so, parse into components. + Matcher m = ZK_COMPLEX_STRING.matcher(connect); + if (!m.matches()) { + throw new ZKConfigException("Bad connect string: " + connect); + } + this.connect = m.group(1); + zkRoot = m.group(2); + clusterId = m.group(3); + return this; + } + + public ZKClusterCoordinatorDriver setConnect(String connect, String zkRoot, + String clusterId) { + this.connect = connect; + this.zkRoot = zkRoot; + this.clusterId = clusterId; + return this; + } + + public ZKClusterCoordinatorDriver setRetryCount(int n) { + retryCount = n; + return this; + } + + public ZKClusterCoordinatorDriver setConnectTimeoutMs(int ms) { + connectTimeoutMs = ms; + return this; + } + + public ZKClusterCoordinatorDriver setRetryDelayMs(int ms) { + retryDelayMs = ms; + return this; + } + + public ZKClusterCoordinatorDriver setMaxStartWaitMs(int ms) { + maxStartWaitMs = ms; + return this; + } + + public ZKClusterCoordinatorDriver setFailureTimoutMs(int ms) { + failureTimeoutMs = ms; + return this; + } + + public ZKClusterCoordinatorDriver setPorts(int userPort, int controlPort, + int dataPort) { + this.userPort = userPort; + this.controlPort = controlPort; + this.dataPort = dataPort; + return this; + } + + /** + * Builds and starts the ZooKeeper cluster coordinator, translating any errors + * that occur. After this call, the listener will start receiving messages. + * + * @return + * @throws ZKRuntimeException + * if ZK startup fails + */ + public ZKClusterCoordinatorDriver build() throws ZKRuntimeException { + try { + zkCoord = new ZKClusterCoordinator(connect, zkRoot, clusterId, retryCount, + retryDelayMs, connectTimeoutMs); + } catch (IOException e) { + throw new ZKRuntimeException( + "Failed to initialize the ZooKeeper cluster coordination", e); + } + try { + zkCoord.start(maxStartWaitMs); + } catch (Exception e) { + throw new ZKRuntimeException( + "Failed to start the ZooKeeper cluster coordination after " + + maxStartWaitMs + " ms.", + e); + } + initialEndpoints = new ArrayList<>(zkCoord.getAvailableEndpoints()); + zkCoord.getCurator().getConnectionStateListenable() + .addListener(stateListener); + amRegistry = new AMRegistry(zkCoord); + amRegistry.useLocalRegistry(zkRoot, clusterId); + return this; + } + + public void addDrillbitListener(DrillbitStatusListener listener) { + zkCoord.addDrillbitStatusListener(listener); + } + + public void removeDrillbitListener(DrillbitStatusListener listener) { + zkCoord.removeDrillbitStatusListener(listener); + } + + /** + * Returns the set of Drillbits registered at the time of the {@link #build()} + * call. Should be empty for a cluster managed by YARN. + * + * @return + */ + + public List<DrillbitEndpoint> getInitialEndpoints() { + return initialEndpoints; + } + + /** + * Convenience method to convert a Drillbit to a string. Note that ZK does not + * advertise the HTTP port, so it does not appear in the generated string. + * + * @param bit + * @return + */ + + public static String asString(DrillbitEndpoint bit) { + return formatKey(bit.getAddress(), bit.getUserPort(), bit.getControlPort(), + bit.getDataPort()); + } + + public String toKey(String host) { + return formatKey(host, userPort, controlPort, dataPort); + } + + public static String formatKey(String host, int userPort, int controlPort, + int dataPort) { + StringBuilder buf = new StringBuilder(); + buf.append(host).append(":").append(userPort).append(':') + .append(controlPort).append(':').append(dataPort); + return buf.toString(); + } + + /** + * Translate ZK connection events into a connected/disconnected state along + * with the time of the first disconnect not followed by a connect. + * + * @param newState + */ + + protected void stateChanged(ConnectionState newState) { + switch (newState) { + case CONNECTED: + case READ_ONLY: + case RECONNECTED: + if (connectionLostTime != 0) { + ZKClusterCoordinator.logger.info("ZK connection regained"); + } + connectionLostTime = 0; + break; + case LOST: + case SUSPENDED: + if (connectionLostTime == 0) { + ZKClusterCoordinator.logger.info("ZK connection lost"); + connectionLostTime = System.currentTimeMillis(); + } + break; + } + } + + /** + * Reports our best guess as to whether ZK has failed. We assume ZK has failed + * if we received a connection lost notification without a subsequent connect + * notification, and we received the disconnect notification log enough ago + * that we assume that a timeout has occurred. + * + * @return + */ + + public boolean hasFailed() { + if (connectionLostTime == 0) { + return false; + } + return System.currentTimeMillis() - connectionLostTime > failureTimeoutMs; + } + + public long getLostConnectionDurationMs() { + if (connectionLostTime == 0) { + return 0; + } + return System.currentTimeMillis() - connectionLostTime; + } + + public void close() { + if (zkCoord == null) { + return; + } + zkCoord.getCurator().getConnectionStateListenable() + .removeListener(stateListener); + try { + zkCoord.close(); + } catch (Exception e) { + ZKClusterCoordinator.logger.error("Error occurred on ZK close, ignored", + e); + } + zkCoord = null; + } + + @Override + public void register(String amHost, int amPort, String appId) + throws AMRegistrationException { + try { + amRegistry.register(amHost, amPort, appId); + } catch (ZKRuntimeException e) { + throw new AMRegistrationException(e); + } + } + + @Override + public void deregister() { + // Nothing to do: ZK does it for us. + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKConfigException.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKConfigException.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKConfigException.java new file mode 100644 index 0000000..700d84b --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKConfigException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.zk; + +public class ZKConfigException extends Exception { + private static final long serialVersionUID = 1L; + + public ZKConfigException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRegistry.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRegistry.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRegistry.java new file mode 100644 index 0000000..0426578 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRegistry.java @@ -0,0 +1,582 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.zk; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.work.foreman.DrillbitStatusListener; +import org.apache.drill.yarn.appMaster.AMWrapperException; +import org.apache.drill.yarn.appMaster.EventContext; +import org.apache.drill.yarn.appMaster.Pollable; +import org.apache.drill.yarn.appMaster.RegistryHandler; +import org.apache.drill.yarn.appMaster.Task; +import org.apache.drill.yarn.appMaster.TaskLifecycleListener; + +/** + * AM-specific implementation of a Drillbit registry backed by ZooKeeper. + * Listens to ZK events for registering a Drillbit and deregistering. Alerts the + * Cluster Controller of these events. + * <p> + * Locking strategy: Receives events from both ZK and the cluster controller, + * both of which must be synchronized. To prevent deadlocks, this class NEVER + * calls into the cluster controller while holding a lock. This prevents the + * following: + * <p> + * ClusterController --> ZKRegistry (OK) <br> + * ZK --> ZKRegistry (OK) <br> + * ZK --> ZKRegistry --> Cluster Controller (bad) + * <p> + * In the case of registration, ZK calls the registry which must alert the + * cluster controller. Cluster controller alerting is handled outside the ZK + * update critical section. + * <p> + * Because ZK events are occur relatively infrequently, any deadlock will occur + * once in a blue moon, which will make it very hard to reproduce. So, extreme + * caution is needed at design time to prevent the problem. + */ + +public class ZKRegistry + implements TaskLifecycleListener, DrillbitStatusListener, Pollable { + /** + * State of each Drillbit that we've discovered through ZK or launched via the + * AM. The tracker is where we combine the ZK information with AM to correlate + * overall Drillbit health. + */ + + protected static class DrillbitTracker { + /** + * A Drillbit can be in one of four states. + */ + + public enum State { + + /** + * An unmanaged Drillbit is one that has announced itself via ZK, but + * which the AM didn't launch (or has not yet received confirmation from + * YARN that it was launched.) In the normal state, this state either does + * not occur (YARN reports the task launch before the Drillbit registers + * in ZK) or is transient (if the Drillbit registers in ZK before YARN + * gets around to telling the AM that the Drillbit was launched.) A + * Drillbit that stays in the unregistered state is likely one launched + * outside the AM: either launched manually or (possibly), one left from a + * previous, failed AM run (though YARN is supposed to kill left-over + * child processes in that case.) + */ + + UNMANAGED, + + /** + * A new Drillbit is one that the AM has launched, but that has not yet + * registered itself with ZK. This is normally a transient state that + * occurs as ZK registration catches up with the YARN launch notification. + * If a Drillbit says in this state, then something is seriously wrong + * (perhaps a mis-configuration). The cluster controller will patiently + * wait a while, then decide bad things are happening and will ask YARN to + * kill the Drillbit, then will retry a time or two, after which it will + * throw up its hands, blacklist the node, and wait for the admin to sort + * things out. + */ + + NEW, + + /** + * Normal operating state: the AM launched the Drillbit, which then + * dutifully registered itself in ZK. Nothing to see here, move along. + */ + + REGISTERED, + + /** + * The Drillbit was working just fine, but its registration has dropped + * out of ZK for a reason best left to the cluster controller to + * determine. Perhaps the controller has decided to kill the Drillbit. + * Perhaps the Drillbit became unresponsive (in which case the controller + * will kill it and retry) or has died (in which case YARN will alert the + * AM that the process exited.) + */ + + DEREGISTERED + } + + /** + * The common key used between tasks and ZK registrations. The key is of the + * form:<br> + * + * <pre> + * host:port:port:port + * </pre> + */ + + protected final String key; + + /** + * ZK tracking state. + * + * @see {@link State} + */ + + protected State state; + + /** + * For Drillbits started by the AM, the task object for this Drillbit. + */ + + protected Task task; + + /** + * For Drillbits discovered through ZK, the Drill endpoint for the Drillbit. + */ + + protected DrillbitEndpoint endpoint; + + public DrillbitTracker(String key, DrillbitEndpoint endpoint) { + this.key = key; + this.state = DrillbitTracker.State.UNMANAGED; + this.endpoint = endpoint; + } + + public DrillbitTracker(String key, Task task) { + this.key = key; + this.task = task; + state = DrillbitTracker.State.NEW; + } + + /** + * Mark that a YARN-managed task has become registered in ZK. This indicates + * that the task has come online. Tell the task to update its state to + * record that the task is, in fact, registered in ZK. This indicates a + * normal, healthy task. + * + * @param tracker + */ + + private void becomeRegistered() { + state = DrillbitTracker.State.REGISTERED; + } + + /** + * Mark that a YARN-managed Drillbit has dropped out of ZK. + * + * @param registryHandler + */ + + public void becomeUnregistered() { + assert state == DrillbitTracker.State.REGISTERED; + state = DrillbitTracker.State.DEREGISTERED; + endpoint = null; + } + } + + public static final String CONTROLLER_PROPERTY = "zk"; + + public static final int UPDATE_PERIOD_MS = 20_000; + + public static final String ENDPOINT_PROPERTY = "endpoint"; + + private static final Log LOG = LogFactory.getLog(ZKRegistry.class); + + /** + * Map of host:port:port:port keys to tracking objects. Identifies the + * Drillbits discovered from ZK, started by the controller, or (ideally) both. + */ + + private Map<String, DrillbitTracker> registry = new HashMap<>(); + + /** + * Interface to Drill's cluster coordinator. + */ + + private ZKClusterCoordinatorDriver zkDriver; + + /** + * Drill's cluster coordinator (or, at least, Drill-on-YARN's version of it. + */ + + private RegistryHandler registryHandler; + + /** + * Last check of ZK status. + */ + + private long lastUpdateTime; + + public ZKRegistry(ZKClusterCoordinatorDriver zkDriver) { + this.zkDriver = zkDriver; + } + + /** + * Called during AM startup to initialize ZK. Checks if any Drillbits are + * already running. These are "unmanaged" because the AM could not have + * started them (since they predate the AM.) + */ + + public void start(RegistryHandler controller) { + this.registryHandler = controller; + try { + zkDriver.build(); + } catch (ZKRuntimeException e) { + LOG.error("Failed to start ZK monitoring", e); + throw new AMWrapperException("Failed to start ZK monitoring", e); + } + for (DrillbitEndpoint dbe : zkDriver.getInitialEndpoints()) { + String key = toKey(dbe); + registry.put(key, new DrillbitTracker(key, dbe)); + + // Blacklist the host for each unmanaged drillbit. + + controller.reserveHost(dbe.getAddress()); + + LOG.warn("Host " + dbe.getAddress() + + " already running a Drillbit outside of YARN."); + } + zkDriver.addDrillbitListener(this); + } + + /** + * Convert a Drillbit endpoint to a string key used in the (zk-->task) map. + * Note that the string format here must match the one used in + * {@link #toKey(Task)} to map a task to string key. + * + * @param dbe + * the Drillbit endpoint from ZK + * @return a string key for this object + */ + + private String toKey(DrillbitEndpoint dbe) { + return ZKClusterCoordinatorDriver.asString(dbe); + } + + /** + * Convert a task to a string key used in the (zk-->task) map. Note that the + * string format here must match the one used in + * {@link #toKey(DrillbitEndpoint)} to map a drillbit endpoint to string key. + * + * @param task + * the task tracked by the cluster controller + * @return a string key for this object + */ + + private String toKey(Task task) { + return zkDriver.toKey(task.getHostName()); + } + + // private String toKey(Container container) { + // return zkDriver.toKey(container.getNodeId().getHost()); + // } + + public static class AckEvent { + Task task; + DrillbitEndpoint endpoint; + + public AckEvent(Task task, DrillbitEndpoint endpoint) { + this.task = task; + this.endpoint = endpoint; + } + } + + /** + * Callback from ZK to indicate that one or more drillbits have become + * registered. We handle registrations in a critical section, then alert the + * cluster controller outside the critical section. + */ + + @Override + public void drillbitRegistered(Set<DrillbitEndpoint> registeredDrillbits) { + List<AckEvent> updates = registerDrillbits(registeredDrillbits); + for (AckEvent event : updates) { + if (event.task == null) { + registryHandler.reserveHost(event.endpoint.getAddress()); + } else { + registryHandler.startAck(event.task, ENDPOINT_PROPERTY, event.endpoint); + } + } + } + + private synchronized List<AckEvent> registerDrillbits( + Set<DrillbitEndpoint> registeredDrillbits) { + List<AckEvent> events = new ArrayList<>(); + for (DrillbitEndpoint dbe : registeredDrillbits) { + AckEvent event = drillbitRegistered(dbe); + if (event != null) { + events.add(event); + } + } + return events; + } + + /** + * Called when a drillbit has become registered. There are two cases. Either + * this is a normal registration of a previously-started task, or this is an + * unmanaged drillbit for which we have no matching task. + */ + + private AckEvent drillbitRegistered(DrillbitEndpoint dbe) { + String key = toKey(dbe); + DrillbitTracker tracker = registry.get(key); + if (tracker == null) { + // Unmanaged drillbit case + + LOG.info("Registration of unmanaged drillbit: " + key); + tracker = new DrillbitTracker(key, dbe); + registry.put(key, tracker); + return new AckEvent(null, dbe); + } + + // Managed drillbit case. Might be we lost, then regained + // ZK connection. + + if (tracker.state == DrillbitTracker.State.REGISTERED) { + LOG.info("Re-registration of known drillbit: " + key); + return null; + } + + // Otherwise, the Drillbit has just registered with ZK. + // Or, if the ZK connection was lost and regained, the + // state changes from DEREGISTERED --> REGISTERED + + LOG.info("Drillbit registered: " + key + ", task: " + tracker.task.toString() ); + tracker.endpoint = dbe; + tracker.becomeRegistered(); + return new AckEvent(tracker.task, dbe); + } + + /** + * Callback from ZK to indicate that one or more drillbits have become + * deregistered from ZK. We handle the deregistrations in a critical section, + * but updates to the cluster controller outside of a critical section. + */ + + @Override + public void drillbitUnregistered( + Set<DrillbitEndpoint> unregisteredDrillbits) { + List<AckEvent> updates = unregisterDrillbits(unregisteredDrillbits); + for (AckEvent event : updates) { + registryHandler.completionAck(event.task, ENDPOINT_PROPERTY); + } + } + + private synchronized List<AckEvent> unregisterDrillbits( + Set<DrillbitEndpoint> unregisteredDrillbits) { + List<AckEvent> events = new ArrayList<>(); + for (DrillbitEndpoint dbe : unregisteredDrillbits) { + AckEvent event = drillbitUnregistered(dbe); + if (event != null) { + events.add(event); + } + } + return events; + } + + /** + * Handle the case that a drillbit becomes unregistered. There are three + * cases. + * <ol> + * <li>The deregistration is for a drillbit that is not in the registry table. + * Indicates a code error.</li> + * <li>The drillbit is unmanaged. This occurs for drillbits started and + * stopped outside of YARN.</li> + * <li>Normal case of deregistration of a YARN-managed drillbit. Inform the + * controller of this event.</li> + * </ol> + * + * @param dbe + */ + + private AckEvent drillbitUnregistered(DrillbitEndpoint dbe) { + String key = toKey(dbe); + DrillbitTracker tracker = registry.get(key); + assert tracker != null; + if (tracker == null) { + // Something is terribly wrong. + // Have seen this when a user kills the Drillbit just after it starts. Evidently, the + // Drillbit registers with ZK just before it is killed, but before DoY hears about + // the registration. + + LOG.error("Internal error - Unexpected drillbit unregistration: " + key); + return null; + } + if (tracker.state == DrillbitTracker.State.UNMANAGED) { + // Unmanaged drillbit + + assert tracker.task == null; + LOG.info("Unmanaged drillbit unregistered: " + key); + registry.remove(key); + registryHandler.releaseHost(dbe.getAddress()); + return null; + } + LOG.info("Drillbit unregistered: " + key + ", task: " + tracker.task.toString() ); + tracker.becomeUnregistered(); + return new AckEvent(tracker.task, dbe); + } + + /** + * Listen for selected YARN task state changes. Called from within the cluster + * controller's critical section. + */ + + @Override + public synchronized void stateChange(Event event, EventContext context) { + switch (event) { + case ALLOCATED: + taskCreated(context.task); + break; + case ENDED: + taskEnded(context.task); + break; + default: + break; + } + } + + /** + * Indicates that the cluster controller has created a task that we expect to + * be monitored by ZK. We handle two cases: the normal case in which we later + * receive a ZK notification. And, the unusual case in which we've already + * received the ZK notification and we now match that notification with this + * task. (The second case could occur if latency causes us to receive the ZK + * notification before we learn from the NM that the task is alive.) + * + * @param task + */ + + private void taskCreated(Task task) { + String key = toKey(task); + DrillbitTracker tracker = registry.get(key); + if (tracker == null) { + // Normal case: no ZK registration yet. + + registry.put(key, new DrillbitTracker(key, task)); + } else if (tracker.state == DrillbitTracker.State.UNMANAGED) { + // Unusual case: ZK registration came first. + + LOG.info("Unmanaged drillbit became managed: " + key); + tracker.task = task; + tracker.becomeRegistered(); + + // Note: safe to call this here as we are already in the controller + // critical section. + + registryHandler.startAck(task, ENDPOINT_PROPERTY, tracker.endpoint); + } else { + LOG.error(task.getLabel() + " - Drillbit registry in wrong state " + + tracker.state + " for new task: " + key); + } + } + + /** + * Report whether the given task is still registered in ZK. Called while + * waiting for a deregistration event to catch possible cases where the + * messages is lost. The message should never be lost, but we've seen + * cases where tasks hang in this state. This is a potential work-around. + * + * @param task + * @return + */ + + public synchronized boolean isRegistered(Task task) { + String key = toKey(task); + DrillbitTracker tracker = registry.get(key); + if (tracker==null) { + return false; + } + return tracker.state == DrillbitTracker.State.REGISTERED; + } + + /** + * Mark that a task (YARN container) has ended. Updates the (zk --> task) + * registry by removing the task. The cluster controller state machine + * monitors ZK and does not end the task until the ZK registration for that + * task drops. As a result, the entry here should be in the deregistered state + * or something is seriously wrong. + * + * @param task + */ + + private void taskEnded(Task task) { + + // If the task has no host name then the task is being cancelled before + // a YARN container was allocated. Just ignore such a case. + + if (task.getHostName() == null) { + return; + } + String key = toKey(task); + DrillbitTracker tracker = registry.get(key); + assert tracker != null; + assert tracker.state == DrillbitTracker.State.DEREGISTERED; + registry.remove(key); + } + + /** + * Periodically check ZK status. If the ZK connection has timed out, something + * is very seriously wrong. Shut the whole Drill cluster down since Drill + * cannot operate without ZooKeeper. + * <p> + * This method should not be synchronized. It checks only the ZK state, not + * internal state. Further, if we do reconnect to ZK, then a ZK thread may + * attempt to update this registry, which will acquire a synchronization lock. + */ + + @Override + public void tick(long curTime) { + if (lastUpdateTime + UPDATE_PERIOD_MS < curTime) { + return; + } + + lastUpdateTime = curTime; + if (zkDriver.hasFailed()) { + int secs = (int) ((zkDriver.getLostConnectionDurationMs() + 500) / 1000); + LOG.error( + "ZooKeeper connection lost, failing after " + secs + " seconds."); + registryHandler.registryDown(); + } + } + + public void finish(RegistryHandler handler) { + zkDriver.removeDrillbitListener(this); + zkDriver.close(); + } + + public synchronized List<String> listUnmanagedDrillits() { + List<String> drillbits = new ArrayList<>(); + for (DrillbitTracker item : registry.values()) { + if (item.state == DrillbitTracker.State.UNMANAGED) { + drillbits.add(item.key); + } + } + return drillbits; + } + + /** + * Get the current registry for testing. Why for testing? Because this is + * unsynchronized. In production code, the map may change out from under you. + * + * @return + */ + + protected Map<String, DrillbitTracker> getRegistryForTesting() { + return registry; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRuntimeException.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRuntimeException.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRuntimeException.java new file mode 100644 index 0000000..4e1b115 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRuntimeException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.zk; + +public class ZKRuntimeException extends Exception { + private static final long serialVersionUID = 1L; + + public ZKRuntimeException(String msg, Exception e) { + super(msg, e); + } + + public ZKRuntimeException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/package-info.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/package-info.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/package-info.java new file mode 100644 index 0000000..14bb427 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/package-info.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Interface between the Application Master and ZooKeeper. Classes here manage two + * registrations: Drillbits and the AM itself. + * <p> + * Drillbit registration is used to confirm that Drillbits have indeed come online. + * If Drillbits fail to come online, then the AM concludes that somethign went wrong. + * If Drilbits drop offline unexpectedly, the AM concludes that the Drillbit is sick + * and restarts it. + * <p> + * The AM registry prevents two AMs from attempting to manage the same cluster. + */ + +package org.apache.drill.yarn.zk; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/config.ftl ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/resources/drill-am/config.ftl b/drill-yarn/src/main/resources/drill-am/config.ftl new file mode 100644 index 0000000..8405c1f --- /dev/null +++ b/drill-yarn/src/main/resources/drill-am/config.ftl @@ -0,0 +1,41 @@ +<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<#include "*/generic.ftl"> +<#macro page_head> +</#macro> + +<#macro page_body> + <h4>Fully Resolved Configuration Settings</h4> + <p> + + <table class="table table-hover" style="width: auto;"> + <tr> + <th>Configuration Key</td> + <th>Value</td> + </tr> + <#list model as pair> + <tr> + <td>${pair.getName()}</td> + <td>${pair.getQuotedValue()}</td> + </tr> + </#list> + </table> + <p> + To modify these values: + <ol> + <li>Edit <code>$DRILL_SITE/drill-on-yarn.conf</code> (for the drill.yarn settings),</li> + <li>Edit <code>$DRILL_SITE/drill-override.conf</code> (for the drill.exec settings).</li> + <li>Restart your Drill cluster using the Drill-on-YARN client.</li> + </ol> +</#macro> + +<@page_html/> http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/confirm.ftl ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/resources/drill-am/confirm.ftl b/drill-yarn/src/main/resources/drill-am/confirm.ftl new file mode 100644 index 0000000..515293d --- /dev/null +++ b/drill-yarn/src/main/resources/drill-am/confirm.ftl @@ -0,0 +1,70 @@ +<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<#include "*/generic.ftl"> +<#macro page_head> +</#macro> + +<#macro page_body> + <h4><#if model.getType( ) == "STOPPED"> + Stop Drill Cluster + <#else> + Resize Drill Cluster + </#if></h4> + <p> + + <#if model.getType( ) == "RESIZED"> + <div class="alert alert-success"> + <strong>Success!</strong> Cluster resizing to ${model.getValue( )} nodes. + </div> + <#elseif model.getType( ) == "CANCELLED"> + <div class="alert alert-info"> + <strong>Success!</strong> Drillbit ${model.getValue( )} was cancelled. + </div> + <#elseif model.getType( ) == "NULL_RESIZE"> + <div class="alert alert-info"> + <strong>Note:</strong> The new size of ${model.getValue( )} is the + same as the current cluster size. + </div> + <#elseif model.getType( ) == "INVALID_RESIZE"> + <div class="alert alert-danger"> + <strong>Error!</strong> Invalid cluster resize level: ${model.getValue( )}. + Please <a href="/manage">try again</a>. + </div> + <#elseif model.getType( ) == "INVALID_GROW"> + <div class="alert alert-danger"> + <strong>Error!</strong> Invalid cluster grow amount: ${model.getValue( )}. + Please <a href="/manage">try again</a>. + </div> + <#elseif model.getType( ) == "INVALID_SHRINK"> + <div class="alert alert-danger"> + <strong>Error!</strong> Invalid cluster shrink amount: ${model.getValue( )}. + Please <a href="/manage">try again</a>. + </div> + <#elseif model.getType( ) == "INVALID_TASK"> + <div class="alert alert-danger"> + <strong>Error!</strong> Invalid Drillbit ID: ${model.getValue( )}. + Perhaps the Drillbit has already stopped. + </div> + <#elseif model.getType( ) == "STOPPED"> + <div class="alert alert alert-success"> + <strong>Success!</strong> Cluster is shutting down. + </div> + Pages on this site will be unavailable until the cluster restarts. + </#if> + <#if model.getType( ) == "CANCELLED"> + Return to the <a href="/drillbits">Drillbits page</a>. + <#elseif model.getType( ) != "STOPPED"> + Return to the <a href="/manage">Management page</a>. + </#if> +</#macro> + +<@page_html/> http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/generic.ftl ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/resources/drill-am/generic.ftl b/drill-yarn/src/main/resources/drill-am/generic.ftl new file mode 100644 index 0000000..b76a917 --- /dev/null +++ b/drill-yarn/src/main/resources/drill-am/generic.ftl @@ -0,0 +1,78 @@ +<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<#-- Adapted from the Drill generic.ftl, adjusted for use in the AM. --> + +<#macro page_head> +</#macro> + +<#macro page_body> +</#macro> + +<#macro page_html> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + + <title>Apache Drill - Application Master</title> + <link rel="shortcut icon" href="/static/img/drill.ico"> + + <link href="/static/css/bootstrap.min.css" rel="stylesheet"> + <link href="/drill-am/static/css/drill-am.css" rel="stylesheet"> + + <script src="/static/js/jquery.min.js"></script> + <script src="/static/js/bootstrap.min.js"></script> + + <!-- HTML5 shim and Respond.js IE8 support of HTML5 elements and media queries --> + <!--[if lt IE 9]> + <script src="/static/js/html5shiv.js"></script> + <script src="/static/js/1.4.2/respond.min.js"></script> + <![endif]--> + + <@page_head/> + </head> + <body role="document"> + <div class="navbar navbar-inverse navbar-fixed-top" role="navigation"> + <div class="container-fluid"> + <div class="navbar-header"> + <button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse"> + <span class="sr-only">Toggle navigation</span> + <span class="icon-bar"></span> + <span class="icon-bar"></span> + <span class="icon-bar"></span> + </button> + <a class="navbar-brand" href="/">Apache Drill</a> + </div> + <div class="navbar-collapse collapse"> + <ul class="nav navbar-nav"> + <li><a href="/config">Configuration</a></li> + <li><a href="/drillbits">Drillbits</a></li> + <li><a href="/manage">Manage</a></li> + <li><a href="/history">History</a></li> + </ul> + <ul class="nav navbar-nav navbar-right"> + <li><a href="${docsLink}">Documentation</a> + <#if showLogin == true > + <li><a href="/login">Log In</a> + </#if> + <#if showLogout == true > + <li><a href="/logout">Log Out (${loggedInUserName})</a> + </#if> + </ul> + </div> + </div> + </div> + + <div class="container-fluid drill-am" role="main"> + <h3>YARN Application Master – ${clusterName}</h3> + <@page_body/> + </div> + </body> + </html> +</#macro> http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/history.ftl ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/resources/drill-am/history.ftl b/drill-yarn/src/main/resources/drill-am/history.ftl new file mode 100644 index 0000000..c588d06 --- /dev/null +++ b/drill-yarn/src/main/resources/drill-am/history.ftl @@ -0,0 +1,59 @@ +<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<#include "*/generic.ftl"> +<#macro page_head> + <meta http-equiv="refresh" content="${refreshSecs}" > +</#macro> + +<#macro page_body> + <h4>Drillbit History</h4> + <p> + + <div class="table-responsive"> + <table class="table table-hover"> + <tr> + <th>ID</th> + <th>Try</th> + <th>Pool</th> + <th>Host</th> + <th>Container</th> + <th>Memory (MB)</th> + <th>Virtual Cores</th> + <th>Start Time</th> + <th>End Time</th> + <th>Disposition</th> + </th> + <#assign count=0> + <#list model as task> + <#assign count=count+1> + <tr> + <td><b>${task.getTaskId( )}</b></td> + <td>${task.getTryCount( )}</td> + <td>${task.getGroupName( )}</td> + <td><#if task.hasContainer( )><a href="${task.getNmLink( )}">${task.getHost( )}</a> + <#else> </#if></td> + <td>${task.getContainerId()}</td> + <td>${task.getMemory( )}</td> + <td>${task.getVcores( )}</td> + <td>${task.getStartTime( )}</td> + <td>${task.getEndTime( )}</td> + <td>${task.getDisposition( )}</td> + </tr> + </#list> + </table> + <#if count == 0> + No drillbits have completed. + </#if> + </div> +</#macro> + +<@page_html/> http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/index.ftl ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/resources/drill-am/index.ftl b/drill-yarn/src/main/resources/drill-am/index.ftl new file mode 100644 index 0000000..18d6ab5 --- /dev/null +++ b/drill-yarn/src/main/resources/drill-am/index.ftl @@ -0,0 +1,128 @@ +<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<#include "*/generic.ftl"> +<#macro page_head> + <meta http-equiv="refresh" content="${model.getRefreshSecs( )}" > +</#macro> + +<#macro page_body> + <h4>Drill Cluster Status</h4> + + <table class="table table-hover" style="width: auto;"> + <tr> + <td>YARN Application ID:</td> + <td><a href="${model.getRmAppLink( )}" data-toggle="tooltip" title="YARN Resource Manager page for this application"> + ${model.getAppId( )}</a></td> + </tr> + <tr> + <td>YARN Resource Manager:</td> + <td><#if model.getRmLink()?? > <#-- Occurs early in startup before app is fully registered. --> + <a href="${model.getRmLink( )}" data-toggle="tooltip" title="YARN Resource Manager page for this container"> + ${model.getRmHost( )}</a> + <#else>Unavailable + </#if></td> + </tr> + <tr> + <td>YARN Node Manager for AM:</td> + <td><#if model.getNmLink()?? > <#-- Occurs early in startup before app is fully registered. --> + <a href="${model.getNmLink( )}" data-toggle="tooltip" title="YARN Node Manager"> + ${model.getNmHost( )}</a> | + <a href="${model.getNmAppLink( )}" data-toggle="tooltip" title="YARN Node Manager page for this application">App info</a> + <#else>Unavailable + </#if></td> + </tr> + <tr> + <td>ZooKeeper Hosts:</td> + <td><span data-toggle="tooltip" title="ZooKeeper connection string."> + ${model.getZkConnectionStr( )}</span></td> + </tr> + <tr> + <td>ZooKeeper Root:</td> + <td><span data-toggle="tooltip" title="ZooKeeper Drill root."> + ${model.getZkRoot( )}</span></td> + </tr> + <tr> + <td>ZooKeeper Cluster ID:</td> + <td><span data-toggle="tooltip" title="ZooKeeper Drill cluster-id."> + ${model.getZkClusterId( )}</span></td> + </tr> + <tr> + <td>State:</td> + <td><span data-toggle="tooltip" title="${model.getStateHint( )}"> + ${model.getState( )}</span></td> + </tr> + <tr> + <td>Target Drillbit Count:</td> + <td>${model.getTargetCount( )}</td> + </tr> + <tr> + <td>Live Drillbit Count:</td> + <td>${model.getLiveCount( )}</td> + </tr> + <#if model.getUnmanagedCount( ) gt 0 > + <tr> + <td style="color: red">Unmanaged Drillbit Count:</td> + <td>${model.getUnmanagedCount( )}</td> + </tr> + </#if> + <#if model.getBlacklistCount( ) gt 0 > + <tr> + <td style="color: red">Blacklisted Node Count:</td> + <td>${model.getBlacklistCount( )}</td> + </tr> + </#if> + <tr> + <td>Total Drill Virtual Cores:</td> + <td>${model.getDrillTotalVcores( )}</td> + </tr> + <tr> + <td>Total Drill Memory (MB):</td> + <td>${model.getDrillTotalMemory( )}</td> + </tr> + <#if model.supportsDiskResource( ) > + <tr> + <td>Total Drill Disks:</td> + <td>${model.getDrillTotalDisks( )}</td> + </tr> + </#if> + </table> + <table class="table table-hover" style="width: auto;"> + <tr> + <th>Group</th> + <th>Type</th> + <th>Target Drillbit Count</th> + <th>Total Drillbits</th> + <th>Live Drillbits</th> + <th>Memory per Drillbit (MB)</th> + <th>VCores per Drillbit</th> + <#if model.supportsDiskResource( ) > + <th>Disks per Drillbit</th> + </#if> + </tr> + <#list model.getGroups( ) as group> + <tr> + <td>${group.getName( )}</td> + <td>${group.getType( )}</td> + <td>${group.getTargetCount( )}</td> + <td>${group.getTaskCount( )}</td> + <td>${group.getLiveCount( )}</td> + <td>${group.getMemory( )}</td> + <td>${group.getVcores( )}</td> + <#if model.supportsDiskResource( ) > + <td>${group.getDisks( )}</td> + </#if> + </tr> + </#list> + </table> +</#macro> + +<@page_html/> http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/login.ftl ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/resources/drill-am/login.ftl b/drill-yarn/src/main/resources/drill-am/login.ftl new file mode 100644 index 0000000..036229e --- /dev/null +++ b/drill-yarn/src/main/resources/drill-am/login.ftl @@ -0,0 +1,35 @@ +<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<#include "*/generic.ftl"> +<#macro page_head> +</#macro> + +<#macro page_body> + <div align="center" class="table-responsive"> + <form role="form" name="input" action="/j_security_check" method="POST"> + <fieldset> + <div class="form-group"> + <img src="/drill-am/static/img/apache-drill-logo.png" alt="Apache Drill Logo"> + <#if model??> + <div class="alert alert-danger"> + <strong>Error</strong> ${model} + </div> + </#if> + <p><input type="text" size="30" name="j_username" placeholder="Username"></p> + <p><input type="password" size="30" name="j_password" placeholder="Password"></p> + <p><button type="submit" class="btn btn-primary">Log In</button></p> + </div> + </fieldset> + </form> + </div> +</#macro> +<@page_html/> \ No newline at end of file