ayushtkn commented on code in PR #6343: URL: https://github.com/apache/hive/pull/6343#discussion_r3005816424
########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public interface ExternalSessionsRegistry { + Logger LOG = LoggerFactory.getLogger(ExternalSessionsRegistry.class); + + /** + * Returns application of id of the external session. + * @return application id + * @throws Exception in case of any exceptions + */ + String getSession() throws Exception; + + /** + * Returns external session back to registry. + * @param appId application id + */ + void returnSession(String appId); + + /** + * Closes the external session registry + */ + void close(); + + Map<String, ExternalSessionsRegistry> INSTANCES = new HashMap<>(); + + static ExternalSessionsRegistry getClient(final Configuration conf) { + ExternalSessionsRegistry registry; + synchronized (INSTANCES) { + // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE after Tez 1.0.0 is released. + String namespace = conf.get("tez.am.registry.namespace"); + // HS2 would need to know about all coordinators running on all compute groups for a given compute (namespace) + // Setting this config to false in client, will make registry client listen on paths under @compute instead of + // @compute/compute-group + // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_ENABLE_COMPUTE_GROUPS after Tez 1.0.0 is released. + conf.setBoolean("tez.am.registry.enable.compute.groups", false); + registry = INSTANCES.get(namespace); + String clazz = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_REGISTRY_CLASS); Review Comment: this should be inside the if block ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public interface ExternalSessionsRegistry { + Logger LOG = LoggerFactory.getLogger(ExternalSessionsRegistry.class); + + /** + * Returns application of id of the external session. + * @return application id + * @throws Exception in case of any exceptions + */ + String getSession() throws Exception; + + /** + * Returns external session back to registry. + * @param appId application id + */ + void returnSession(String appId); + + /** + * Closes the external session registry + */ + void close(); + + Map<String, ExternalSessionsRegistry> INSTANCES = new HashMap<>(); Review Comment: I am not able to catch the lifecycle of this? Who closes the instances & cleans this up? ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AbstractTriggerValidator.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public abstract class AbstractTriggerValidator { + private ScheduledExecutorService scheduledExecutorService = null; + abstract Runnable getTriggerValidatorRunnable(); + + void startTriggerValidator(long triggerValidationIntervalMs) { + if (scheduledExecutorService == null) { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); + Runnable triggerValidatorRunnable = getTriggerValidatorRunnable(); + scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, + triggerValidationIntervalMs, TimeUnit.MILLISECONDS); + TezSessionPoolSession.LOG.info("Started trigger validator with interval: {} ms", triggerValidationIntervalMs); Review Comment: is there a reason for using the Logger from `TezSessionPoolSession`? ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java: ########## @@ -417,7 +420,13 @@ TriggerValidatorRunnable getTriggerValidatorRunnable() { } protected TezSessionPoolSession createSession(String sessionId, HiveConf conf) { - return new TezSessionPoolSession(sessionId, this, expirationTracker, conf); + TezSessionState base; + if (useExternalSessions) { + base = new TezExternalSessionState(sessionId, conf); + } else { + base = new TezSessionState(sessionId, conf); + } Review Comment: this part seems duped with WorkloadManager, maybe we can refactor to avoid duplication ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; + +/** + * A {@code TezSession} implementation that represents externally managed Tez sessions. + * <p> + * Unlike {@code TezSessionState}, these sessions are not created or owned by HiveServer2. + * Instead, HiveServer2 connects to an already existing Tez session. + * </p> + * + * <h3>Lifecycle</h3> + * <ol> + * <li>An instance of {@code TezExternalSessionState} is created.</li> + * <li>An artificial application ID is acquired from a registry. This does not + * correspond to a real YARN application, as the session is unmanaged.</li> + * <li>A {@code TezClient} is instantiated but not started (unlike in + * {@code TezSessionState}), allowing the rest of the Hive codebase to + * interact with it transparently.</li> + * </ol> + * + * <p> + * This abstraction enables Hive components to interact with external Tez + * sessions using the same interfaces as internally managed sessions. + * </p> + */ +public class TezExternalSessionState extends TezSessionState { + private static final Object DEFAULT_CONF_CREATE_LOCK = new Object(); + private static TezConfiguration defaultTezConfiguration; + + private String externalAppId; + private boolean isDestroying = false; + private final ExternalSessionsRegistry registry; + + public TezExternalSessionState(String sessionId, HiveConf conf) { + super(sessionId, conf); + this.registry = ExternalSessionsRegistry.getClient(conf); + synchronized (DEFAULT_CONF_CREATE_LOCK) { + if (defaultTezConfiguration == null) { + defaultTezConfiguration = createDefaultTezConfig(); + } + } + } + + @Override + public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) { + /* + * No-op implementation. + * External Tez sessions are not backed by a YARN application and therefore + * do not manage or localize resources. As a result, there are no local + * resources to ensure for this session type. + */ + } + + @Override + protected void openInternal(String[] additionalFilesNotFromConf, + boolean isAsync, LogHelper console, HiveResources resources) + throws IOException, TezException { + initQueueAndUser(); + + boolean llapMode = isLlapMode(); + + Map<String, String> amEnv = new HashMap<>(); + MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv); + + TezConfiguration tezConfig = new TezConfiguration(defaultTezConfiguration); + setupSessionAcls(tezConfig, conf); + ServicePluginsDescriptor spd = createServicePluginDescriptor(llapMode, tezConfig); + Credentials llapCredentials = createLlapCredentials(llapMode, tezConfig); + + final TezClient session = TezClient.newBuilder("HIVE-" + getSessionId(), tezConfig) + .setIsSession(true) + .setCredentials(llapCredentials).setServicePluginDescriptor(spd) + .build(); + + LOG.info("Opening new External Tez Session (id: {})", getSessionId()); + TezJobMonitor.initShutdownHook(); + + // External sessions doesn't support async mode (getClient should be much cheaper than open, + // and the async mode is anyway only used for CLI). + if (isAsync) { + LOG.info("Ignoring the async argument for an external session {}", getSessionId()); + } + try { + externalAppId = registry.getSession(); + } catch (TezException | IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } + + session.getClient(ApplicationId.fromString(externalAppId)); + LOG.info("Started an external session; client name {}, app ID {}", session.getClientName(), externalAppId); + setTezClient(session); + } + + @Override + public void close(boolean keepDagFilesDir) throws Exception { + // We never close external sessions that don't have errors. + if (externalAppId != null) { + registry.returnSession(externalAppId); Review Comment: if this throws any `RuntimeException`, `externalAppId` won't be even nullified and would lead to resource leaks I belive ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java: ########## @@ -103,7 +103,13 @@ public interface SessionObjectFactory<SessionType> { this.deltaRemaining = new AtomicInteger(initialSize); - final int threadCount = HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS); + int threadCount = 1; + if (!HiveConf.getBoolVar(initConf, ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS)) { + // Don't use multiple threads for external sessions. + threadCount = Math.min(initialSize, + HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS)); Review Comment: there is a line below ``` if (initialSize == 0) return; // May be resized later. ``` which means `initialSize` can be resized, so do we need to do this `Math.min` ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; + +/** + * A {@code TezSession} implementation that represents externally managed Tez sessions. + * <p> + * Unlike {@code TezSessionState}, these sessions are not created or owned by HiveServer2. + * Instead, HiveServer2 connects to an already existing Tez session. + * </p> + * + * <h3>Lifecycle</h3> + * <ol> + * <li>An instance of {@code TezExternalSessionState} is created.</li> + * <li>An artificial application ID is acquired from a registry. This does not + * correspond to a real YARN application, as the session is unmanaged.</li> + * <li>A {@code TezClient} is instantiated but not started (unlike in + * {@code TezSessionState}), allowing the rest of the Hive codebase to + * interact with it transparently.</li> + * </ol> + * + * <p> + * This abstraction enables Hive components to interact with external Tez + * sessions using the same interfaces as internally managed sessions. + * </p> + */ +public class TezExternalSessionState extends TezSessionState { + private static final Object DEFAULT_CONF_CREATE_LOCK = new Object(); + private static TezConfiguration defaultTezConfiguration; + + private String externalAppId; + private boolean isDestroying = false; + private final ExternalSessionsRegistry registry; + + public TezExternalSessionState(String sessionId, HiveConf conf) { + super(sessionId, conf); + this.registry = ExternalSessionsRegistry.getClient(conf); + synchronized (DEFAULT_CONF_CREATE_LOCK) { + if (defaultTezConfiguration == null) { + defaultTezConfiguration = createDefaultTezConfig(); + } + } + } + + @Override + public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) { + /* + * No-op implementation. + * External Tez sessions are not backed by a YARN application and therefore + * do not manage or localize resources. As a result, there are no local + * resources to ensure for this session type. + */ + } + + @Override + protected void openInternal(String[] additionalFilesNotFromConf, + boolean isAsync, LogHelper console, HiveResources resources) + throws IOException, TezException { + initQueueAndUser(); + + boolean llapMode = isLlapMode(); + + Map<String, String> amEnv = new HashMap<>(); + MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv); Review Comment: who uses this `amEnv`? & if we are passing empty map, this why is this method actually being called? ########## ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezExternalSessionsRegistryClient.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.Test; + +public class TestTezExternalSessionsRegistryClient { + @Test + public void testDummyExternalSessionsRegistry() { + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_REGISTRY_CLASS, + DummyExternalSessionsRegistry.class.getName()); + // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE after Tez 1.0.0 is released. + conf.set("tez.am.registry.namespace", "dummy"); + ExternalSessionsRegistry externalSessionsRegistry = ExternalSessionsRegistry.getClient(conf); + assertTrue(externalSessionsRegistry instanceof DummyExternalSessionsRegistry); + } + + @Test + public void testTezExternalSessionsRegistry() { + HiveConf conf = new HiveConf(); + // TODO: change this to TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM after Tez 1.0.0 is released. + conf.set("tez.am.zookeeper.quorum", "test-quorum"); + // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE after Tez 1.0.0 is released. + conf.set("tez.am.registry.namespace", "tez"); + conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_REGISTRY_CLASS, + ZookeeperExternalSessionsRegistryClient.class.getName()); + ExternalSessionsRegistry externalSessionsRegistry = ExternalSessionsRegistry.getClient(conf); + assertTrue(externalSessionsRegistry instanceof ZookeeperExternalSessionsRegistryClient); Review Comment: should we be more strict ``` assertEquals(ZookeeperExternalSessionsRegistryClient.class, externalSessionsRegistry.getClass()); ``` ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; + +/** + * A {@code TezSession} implementation that represents externally managed Tez sessions. + * <p> + * Unlike {@code TezSessionState}, these sessions are not created or owned by HiveServer2. + * Instead, HiveServer2 connects to an already existing Tez session. + * </p> + * + * <h3>Lifecycle</h3> + * <ol> + * <li>An instance of {@code TezExternalSessionState} is created.</li> + * <li>An artificial application ID is acquired from a registry. This does not + * correspond to a real YARN application, as the session is unmanaged.</li> + * <li>A {@code TezClient} is instantiated but not started (unlike in + * {@code TezSessionState}), allowing the rest of the Hive codebase to + * interact with it transparently.</li> + * </ol> + * + * <p> + * This abstraction enables Hive components to interact with external Tez + * sessions using the same interfaces as internally managed sessions. + * </p> + */ +public class TezExternalSessionState extends TezSessionState { + private static final Object DEFAULT_CONF_CREATE_LOCK = new Object(); + private static TezConfiguration defaultTezConfiguration; + + private String externalAppId; + private boolean isDestroying = false; + private final ExternalSessionsRegistry registry; + + public TezExternalSessionState(String sessionId, HiveConf conf) { + super(sessionId, conf); + this.registry = ExternalSessionsRegistry.getClient(conf); + synchronized (DEFAULT_CONF_CREATE_LOCK) { + if (defaultTezConfiguration == null) { + defaultTezConfiguration = createDefaultTezConfig(); + } + } + } + + @Override + public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) { + /* + * No-op implementation. + * External Tez sessions are not backed by a YARN application and therefore + * do not manage or localize resources. As a result, there are no local + * resources to ensure for this session type. + */ + } + + @Override + protected void openInternal(String[] additionalFilesNotFromConf, + boolean isAsync, LogHelper console, HiveResources resources) + throws IOException, TezException { + initQueueAndUser(); + + boolean llapMode = isLlapMode(); + + Map<String, String> amEnv = new HashMap<>(); + MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv); + + TezConfiguration tezConfig = new TezConfiguration(defaultTezConfiguration); + setupSessionAcls(tezConfig, conf); + ServicePluginsDescriptor spd = createServicePluginDescriptor(llapMode, tezConfig); + Credentials llapCredentials = createLlapCredentials(llapMode, tezConfig); + + final TezClient session = TezClient.newBuilder("HIVE-" + getSessionId(), tezConfig) Review Comment: Can we change the name of the variable, This is bit confusing with Hive Session. maybe sessionClient or sessionTezClient? ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public interface ExternalSessionsRegistry { + Logger LOG = LoggerFactory.getLogger(ExternalSessionsRegistry.class); + + /** + * Returns application of id of the external session. + * @return application id + * @throws Exception in case of any exceptions + */ + String getSession() throws Exception; + + /** + * Returns external session back to registry. + * @param appId application id + */ + void returnSession(String appId); + + /** + * Closes the external session registry + */ + void close(); + + Map<String, ExternalSessionsRegistry> INSTANCES = new HashMap<>(); + + static ExternalSessionsRegistry getClient(final Configuration conf) { + ExternalSessionsRegistry registry; + synchronized (INSTANCES) { + // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE after Tez 1.0.0 is released. + String namespace = conf.get("tez.am.registry.namespace"); + // HS2 would need to know about all coordinators running on all compute groups for a given compute (namespace) + // Setting this config to false in client, will make registry client listen on paths under @compute instead of + // @compute/compute-group + // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_ENABLE_COMPUTE_GROUPS after Tez 1.0.0 is released. + conf.setBoolean("tez.am.registry.enable.compute.groups", false); Review Comment: We are mutating the config provided by the client? Means the client config object would get changed here? ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; + +/** + * A {@code TezSession} implementation that represents externally managed Tez sessions. + * <p> + * Unlike {@code TezSessionState}, these sessions are not created or owned by HiveServer2. + * Instead, HiveServer2 connects to an already existing Tez session. + * </p> + * + * <h3>Lifecycle</h3> + * <ol> + * <li>An instance of {@code TezExternalSessionState} is created.</li> + * <li>An artificial application ID is acquired from a registry. This does not + * correspond to a real YARN application, as the session is unmanaged.</li> + * <li>A {@code TezClient} is instantiated but not started (unlike in + * {@code TezSessionState}), allowing the rest of the Hive codebase to + * interact with it transparently.</li> + * </ol> + * + * <p> + * This abstraction enables Hive components to interact with external Tez + * sessions using the same interfaces as internally managed sessions. + * </p> + */ +public class TezExternalSessionState extends TezSessionState { + private static final Object DEFAULT_CONF_CREATE_LOCK = new Object(); + private static TezConfiguration defaultTezConfiguration; + + private String externalAppId; + private boolean isDestroying = false; Review Comment: this should also be volatile I belive ########## common/src/java/org/apache/hadoop/hive/conf/HiveConf.java: ########## @@ -3998,6 +3998,15 @@ public static enum ConfVars { "true", new StringSet("true", "false", "ignore"), "Whether Tez session pool should allow submitting queries to custom queues. The options\n" + "are true, false (error out), ignore (accept the query but ignore the queue setting)."), + HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS("hive.server2.use.external.sessions", false, Review Comment: should be ``hive.server2.tez.use.external.sessions`` ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; + +/** + * A {@code TezSession} implementation that represents externally managed Tez sessions. + * <p> + * Unlike {@code TezSessionState}, these sessions are not created or owned by HiveServer2. + * Instead, HiveServer2 connects to an already existing Tez session. + * </p> + * + * <h3>Lifecycle</h3> + * <ol> + * <li>An instance of {@code TezExternalSessionState} is created.</li> + * <li>An artificial application ID is acquired from a registry. This does not + * correspond to a real YARN application, as the session is unmanaged.</li> + * <li>A {@code TezClient} is instantiated but not started (unlike in + * {@code TezSessionState}), allowing the rest of the Hive codebase to + * interact with it transparently.</li> + * </ol> + * + * <p> + * This abstraction enables Hive components to interact with external Tez + * sessions using the same interfaces as internally managed sessions. + * </p> + */ +public class TezExternalSessionState extends TezSessionState { + private static final Object DEFAULT_CONF_CREATE_LOCK = new Object(); + private static TezConfiguration defaultTezConfiguration; Review Comment: I believe this should be `volatile`, ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public interface ExternalSessionsRegistry { + Logger LOG = LoggerFactory.getLogger(ExternalSessionsRegistry.class); + + /** + * Returns application of id of the external session. + * @return application id + * @throws Exception in case of any exceptions + */ + String getSession() throws Exception; + + /** + * Returns external session back to registry. + * @param appId application id + */ + void returnSession(String appId); + + /** + * Closes the external session registry + */ + void close(); + + Map<String, ExternalSessionsRegistry> INSTANCES = new HashMap<>(); + + static ExternalSessionsRegistry getClient(final Configuration conf) { + ExternalSessionsRegistry registry; + synchronized (INSTANCES) { + // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE after Tez 1.0.0 is released. + String namespace = conf.get("tez.am.registry.namespace"); + // HS2 would need to know about all coordinators running on all compute groups for a given compute (namespace) + // Setting this config to false in client, will make registry client listen on paths under @compute instead of + // @compute/compute-group + // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_ENABLE_COMPUTE_GROUPS after Tez 1.0.0 is released. + conf.setBoolean("tez.am.registry.enable.compute.groups", false); Review Comment: Should we do this conf.get or set inside the synchronized block? Even the logging? I believe we should keep the synchronized block as this as possible? and maybe we can avoid the double lookup using ``computeIfAbsent`` ``` conf.setBoolean("tez.am.registry.enable.compute.groups", false); String namespace = conf.get("tez.am.registry.namespace"); synchronized (INSTANCES) { // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE after Tez 1.0.0 is released. // HS2 would need to know about all coordinators running on all compute groups for a given compute (namespace) // Setting this config to false in client, will make registry client listen on paths under @compute instead of // @compute/compute-group // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_ENABLE_COMPUTE_GROUPS after Tez 1.0.0 is released. registry = INSTANCES.computeIfAbsent(namespace, ns -> { try { String clazz = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_REGISTRY_CLASS); return JavaUtils.newInstance(JavaUtils.getClass(clazz, ExternalSessionsRegistry.class), new Class<?>[] {HiveConf.class}, new Object[] {conf}); } catch (MetaException e) { throw new RuntimeException(e); } }); } LOG.info("Returning tez external AM registry ({}) for namespace '{}'", System.identityHashCode(registry), namespace); return registry; ``` ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +// TODO: tez should provide this registry +public class ZookeeperExternalSessionsRegistryClient implements ExternalSessionsRegistry { + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperExternalSessionsRegistryClient.class); + + // TODO: internal only for now to start and reconnect to external session + private static final String ZK_PATH = "/tez_am/server"; + private final HiveConf initConf; + private final Set<String> available = new HashSet<>(); + private final Set<String> taken = new HashSet<>(); + private final Object lock = new Object(); + private final int maxAttempts; + + private CuratorCache cache; + private boolean isInitialized; + + + public ZookeeperExternalSessionsRegistryClient(final HiveConf initConf) { + this.initConf = initConf; + this.maxAttempts = HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS); + } + + private static String getApplicationId(final ChildData childData) { + return childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1); + } + + public void close() { + if (cache != null) { + cache.close(); + } + } + + private void init() throws Exception { + String zkServer = HiveConf.getVar(initConf, ConfVars.HIVE_ZOOKEEPER_QUORUM); + String zkNamespace = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); + String effectivePath = zkNamespace + ZK_PATH; + CuratorFramework client = CuratorFrameworkFactory.newClient(zkServer, new ExponentialBackoffRetry(1000, 3)); + synchronized (lock) { + client.start(); + this.cache = CuratorCache.build(client, effectivePath); + CuratorCacheListener listener = CuratorCacheListener.builder() + .forPathChildrenCache(effectivePath, client, new ExternalSessionsPathListener()) + .build(); + cache.listenable().addListener(listener); + cache.start(); + cache.stream() + .filter(childData -> childData.getPath() != null + && childData.getPath().startsWith(effectivePath + "/")) + .forEach(childData -> available.add(getApplicationId(childData))); + LOG.info("Initial external sessions: {}", available); + isInitialized = true; + } + } + + public String getSession() throws Exception { Review Comment: nit: add override ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +// TODO: tez should provide this registry +public class ZookeeperExternalSessionsRegistryClient implements ExternalSessionsRegistry { + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperExternalSessionsRegistryClient.class); + + // TODO: internal only for now to start and reconnect to external session + private static final String ZK_PATH = "/tez_am/server"; + private final HiveConf initConf; + private final Set<String> available = new HashSet<>(); + private final Set<String> taken = new HashSet<>(); + private final Object lock = new Object(); + private final int maxAttempts; + + private CuratorCache cache; + private boolean isInitialized; + + + public ZookeeperExternalSessionsRegistryClient(final HiveConf initConf) { + this.initConf = initConf; + this.maxAttempts = HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS); + } + + private static String getApplicationId(final ChildData childData) { + return childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1); + } + + public void close() { + if (cache != null) { + cache.close(); + } + } + + private void init() throws Exception { + String zkServer = HiveConf.getVar(initConf, ConfVars.HIVE_ZOOKEEPER_QUORUM); + String zkNamespace = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); + String effectivePath = zkNamespace + ZK_PATH; + CuratorFramework client = CuratorFrameworkFactory.newClient(zkServer, new ExponentialBackoffRetry(1000, 3)); Review Comment: who closes this client? ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; + +/** + * A {@code TezSession} implementation that represents externally managed Tez sessions. + * <p> + * Unlike {@code TezSessionState}, these sessions are not created or owned by HiveServer2. + * Instead, HiveServer2 connects to an already existing Tez session. + * </p> + * + * <h3>Lifecycle</h3> + * <ol> + * <li>An instance of {@code TezExternalSessionState} is created.</li> + * <li>An artificial application ID is acquired from a registry. This does not + * correspond to a real YARN application, as the session is unmanaged.</li> + * <li>A {@code TezClient} is instantiated but not started (unlike in + * {@code TezSessionState}), allowing the rest of the Hive codebase to + * interact with it transparently.</li> + * </ol> + * + * <p> + * This abstraction enables Hive components to interact with external Tez + * sessions using the same interfaces as internally managed sessions. + * </p> + */ +public class TezExternalSessionState extends TezSessionState { + private static final Object DEFAULT_CONF_CREATE_LOCK = new Object(); + private static TezConfiguration defaultTezConfiguration; + + private String externalAppId; + private boolean isDestroying = false; + private final ExternalSessionsRegistry registry; + + public TezExternalSessionState(String sessionId, HiveConf conf) { + super(sessionId, conf); + this.registry = ExternalSessionsRegistry.getClient(conf); + synchronized (DEFAULT_CONF_CREATE_LOCK) { + if (defaultTezConfiguration == null) { + defaultTezConfiguration = createDefaultTezConfig(); + } + } + } + + @Override + public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) { + /* + * No-op implementation. + * External Tez sessions are not backed by a YARN application and therefore + * do not manage or localize resources. As a result, there are no local + * resources to ensure for this session type. + */ + } + + @Override + protected void openInternal(String[] additionalFilesNotFromConf, Review Comment: Curious like if someone calls open multiple times, what happens, we don't have any gaurd like isOpen() -> return or so ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java: ########## @@ -33,35 +32,33 @@ /** * Handles only Kill Action. */ -public class KillTriggerActionHandler implements TriggerActionHandler<TezSessionState> { +public class KillTriggerActionHandler implements TriggerActionHandler<TezSession> { private static final Logger LOG = LoggerFactory.getLogger(KillTriggerActionHandler.class); private final HiveConf conf; public KillTriggerActionHandler() { - this.conf = new HiveConf(); + this.conf = new HiveConf(); } @Override - public void applyAction(final Map<TezSessionState, Trigger> queriesViolated) { - for (Map.Entry<TezSessionState, Trigger> entry : queriesViolated.entrySet()) { - if (entry.getValue().getAction().getType() == Action.Type.KILL_QUERY) { - TezSessionState sessionState = entry.getKey(); - String queryId = sessionState.getWmContext().getQueryId(); - try { - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - DriverUtils.setUpAndStartSessionState(conf, ugi.getShortUserName()); - KillQuery killQuery = sessionState.getKillQuery(); - // if kill query is null then session might have been released to pool or closed already - if (killQuery != null) { - sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg(), - sessionState.getConf()); - } - } catch (HiveException | IOException e) { - LOG.warn("Unable to kill query {} for trigger violation", queryId); - } - } else { - throw new RuntimeException("Unsupported action: " + entry.getValue()); + public void applyAction(Map<TezSession, Trigger> queriesViolated) { + for (Map.Entry<TezSession, Trigger> entry : queriesViolated.entrySet()) { + if (entry.getValue().getAction().getType() == Action.Type.KILL_QUERY) { + TezSession sessionState = entry.getKey(); + String queryId = sessionState.getWmContext().getQueryId(); + try { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + DriverUtils.setUpAndStartSessionState(conf, ugi.getShortUserName()); + boolean wasKilled = sessionState.killQuery(entry.getValue().getViolationMsg()); + if (!wasKilled) { + LOG.info("Didn't kill the query {}", queryId); + } + } catch (HiveException | IOException e) { + LOG.warn("Unable to kill query {} for trigger violation", queryId); Review Comment: we should log the exception here as well ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.KillQuery; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.WmContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGStatus; + +/** + * A bogus interface that basically describes the evolved usage patterns of TezSessionStateImpl. + * Needed due to lack of multiple inheritance in Java; probably good to have, too - may make + * TezSessionState interface a little bit clearer or even encourage some future cleanup. + * + * It's implemented in two ways - core implementations (regular session, external session), + * and extra functionality implementation (pool session, WM session, etc.) that wraps an instance + * of the core implementation (i.e. use composition). With MI, each session type would just inherit + * from one of each. + */ +public interface TezSession { + final class HiveResources { + public HiveResources(Path dagResourcesDir) { + this.dagResourcesDir = dagResourcesDir; + } + /** A directory that will contain resources related to DAGs and specified in configs. */ + final Path dagResourcesDir; + final Map<String, LocalResource> additionalFilesNotFromConf = new HashMap<>(); + /** Localized resources of this session; both from conf and not from conf (above). */ + final Set<LocalResource> localizedResources = new HashSet<>(); + + @Override + public String toString() { + return dagResourcesDir + "; " + additionalFilesNotFromConf.size() + " additional files, " + + localizedResources.size() + " localized resources"; + } + } + + // Core session operations. + void open() throws IOException, TezException; + void open(HiveResources resources) throws IOException, TezException; + void open(String[] additionalFilesNotFromConf) throws IOException, TezException; + void beginOpen(String[] additionalFiles, LogHelper console) throws IOException, TezException; + void endOpen() throws InterruptedException, CancellationException; Review Comment: nit: a bunch of child classes of this class, doesn't have override annotation for bunch of methods, if you could double check once ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +// TODO: tez should provide this registry +public class ZookeeperExternalSessionsRegistryClient implements ExternalSessionsRegistry { + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperExternalSessionsRegistryClient.class); + + // TODO: internal only for now to start and reconnect to external session + private static final String ZK_PATH = "/tez_am/server"; + private final HiveConf initConf; + private final Set<String> available = new HashSet<>(); + private final Set<String> taken = new HashSet<>(); + private final Object lock = new Object(); + private final int maxAttempts; + + private CuratorCache cache; + private boolean isInitialized; + + + public ZookeeperExternalSessionsRegistryClient(final HiveConf initConf) { + this.initConf = initConf; + this.maxAttempts = HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS); + } + + private static String getApplicationId(final ChildData childData) { + return childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1); + } + + public void close() { + if (cache != null) { + cache.close(); + } + } + + private void init() throws Exception { + String zkServer = HiveConf.getVar(initConf, ConfVars.HIVE_ZOOKEEPER_QUORUM); + String zkNamespace = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); + String effectivePath = zkNamespace + ZK_PATH; + CuratorFramework client = CuratorFrameworkFactory.newClient(zkServer, new ExponentialBackoffRetry(1000, 3)); + synchronized (lock) { + client.start(); + this.cache = CuratorCache.build(client, effectivePath); + CuratorCacheListener listener = CuratorCacheListener.builder() + .forPathChildrenCache(effectivePath, client, new ExternalSessionsPathListener()) + .build(); + cache.listenable().addListener(listener); + cache.start(); + cache.stream() + .filter(childData -> childData.getPath() != null + && childData.getPath().startsWith(effectivePath + "/")) + .forEach(childData -> available.add(getApplicationId(childData))); + LOG.info("Initial external sessions: {}", available); + isInitialized = true; + } + } + + public String getSession() throws Exception { + synchronized (lock) { + if (!isInitialized) { + init(); + } + long endTimeNs = System.nanoTime() + (1000000000L * maxAttempts); + while (available.isEmpty() && ((endTimeNs - System.nanoTime()) > 0)) { + lock.wait(1000L); + } + Iterator<String> iter = available.iterator(); + if (!iter.hasNext()) { + throw new IOException("Cannot get a session after " + maxAttempts + " attempts"); + } + String appId = iter.next(); + iter.remove(); + taken.add(appId); + return appId; + } + } + + public void returnSession(String appId) { Review Comment: add override ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +// TODO: tez should provide this registry +public class ZookeeperExternalSessionsRegistryClient implements ExternalSessionsRegistry { + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperExternalSessionsRegistryClient.class); + + // TODO: internal only for now to start and reconnect to external session + private static final String ZK_PATH = "/tez_am/server"; Review Comment: should this be configurable? ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +// TODO: tez should provide this registry +public class ZookeeperExternalSessionsRegistryClient implements ExternalSessionsRegistry { + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperExternalSessionsRegistryClient.class); + + // TODO: internal only for now to start and reconnect to external session + private static final String ZK_PATH = "/tez_am/server"; + private final HiveConf initConf; + private final Set<String> available = new HashSet<>(); + private final Set<String> taken = new HashSet<>(); + private final Object lock = new Object(); + private final int maxAttempts; + + private CuratorCache cache; + private boolean isInitialized; + + + public ZookeeperExternalSessionsRegistryClient(final HiveConf initConf) { + this.initConf = initConf; + this.maxAttempts = HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS); + } + + private static String getApplicationId(final ChildData childData) { + return childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1); + } + + public void close() { + if (cache != null) { + cache.close(); + } + } + + private void init() throws Exception { + String zkServer = HiveConf.getVar(initConf, ConfVars.HIVE_ZOOKEEPER_QUORUM); + String zkNamespace = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); + String effectivePath = zkNamespace + ZK_PATH; + CuratorFramework client = CuratorFrameworkFactory.newClient(zkServer, new ExponentialBackoffRetry(1000, 3)); + synchronized (lock) { + client.start(); + this.cache = CuratorCache.build(client, effectivePath); + CuratorCacheListener listener = CuratorCacheListener.builder() + .forPathChildrenCache(effectivePath, client, new ExternalSessionsPathListener()) + .build(); + cache.listenable().addListener(listener); + cache.start(); + cache.stream() + .filter(childData -> childData.getPath() != null + && childData.getPath().startsWith(effectivePath + "/")) + .forEach(childData -> available.add(getApplicationId(childData))); + LOG.info("Initial external sessions: {}", available); + isInitialized = true; + } + } + + public String getSession() throws Exception { + synchronized (lock) { + if (!isInitialized) { + init(); + } + long endTimeNs = System.nanoTime() + (1000000000L * maxAttempts); + while (available.isEmpty() && ((endTimeNs - System.nanoTime()) > 0)) { + lock.wait(1000L); + } + Iterator<String> iter = available.iterator(); + if (!iter.hasNext()) { + throw new IOException("Cannot get a session after " + maxAttempts + " attempts"); + } + String appId = iter.next(); + iter.remove(); + taken.add(appId); + return appId; + } + } + + public void returnSession(String appId) { + synchronized (lock) { + if (!isInitialized) { + throw new AssertionError("Not initialized"); Review Comment: nit should be ``` throw new IllegalStateException("Not initialized"); ``` ########## ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +// TODO: tez should provide this registry +public class ZookeeperExternalSessionsRegistryClient implements ExternalSessionsRegistry { + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperExternalSessionsRegistryClient.class); + + // TODO: internal only for now to start and reconnect to external session + private static final String ZK_PATH = "/tez_am/server"; + private final HiveConf initConf; + private final Set<String> available = new HashSet<>(); + private final Set<String> taken = new HashSet<>(); + private final Object lock = new Object(); + private final int maxAttempts; + + private CuratorCache cache; + private boolean isInitialized; Review Comment: should be volatile ########## ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.junit.Test; + +/** + * Tests for {@link ZookeeperExternalSessionsRegistryClient}. + */ +public class TestZookeeperExternalSessionsRegistryClient { + + /** + * Integration-style unit test that ensures {@link ZookeeperExternalSessionsRegistryClient} + * can discover sessions from ZooKeeper and hand them out via {@link ExternalSessionsRegistry#getSession()}. + */ + @Test + public void testGetAndReturnSession() throws Exception { + TestingServer server = null; + CuratorFramework client = null; + ZookeeperExternalSessionsRegistryClient registry = null; + try { + server = new TestingServer(); + String connectString = server.getConnectString(); + + HiveConf conf = new HiveConf(); + conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, "/tez_ns"); + conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 1); + + String namespace = HiveConf.getVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); + String effectivePath = namespace + "/tez_am/server"; Review Comment: should we normalize? ``` String effectivePath = (zkNamespace.startsWith("/") ? zkNamespace : "/" + zkNamespace) + ZK_PATH; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
