This is an automated email from the ASF dual-hosted git repository. kgyrtkirk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
commit e97ff5b3df4258fa83a15507000d7e42c2aac8f4 Author: Zoltan Haindrich <k...@rxd.hu> AuthorDate: Mon Feb 17 12:07:18 2020 +0000 HIVE-22873: Make it possible to identify which hs2 instance executed a scheduled query (Zoltan Haindrich reviewed by Miklos Gergely) Signed-off-by: Zoltan Haindrich <k...@rxd.hu> --- .../scheduled/ScheduledQueryExecutionContext.java | 11 ++++++++ .../scheduled/ScheduledQueryExecutionService.java | 25 ++++++++++------- .../hive/ql/schq/TestScheduledQueryService.java | 31 +++++++++++++--------- 3 files changed, 45 insertions(+), 22 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java index 9decb8c..1bb24ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.scheduled; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -33,6 +35,7 @@ public class ScheduledQueryExecutionContext { public final ExecutorService executor; public final IScheduledQueryMaintenanceService schedulerService; public final HiveConf conf; + public final String executorHostName; public ScheduledQueryExecutionContext( ExecutorService executor, @@ -41,6 +44,14 @@ public class ScheduledQueryExecutionContext { this.executor = executor; this.conf = conf; this.schedulerService = service; + try { + this.executorHostName = InetAddress.getLocalHost().getHostName(); + if (executorHostName == null) { + throw new RuntimeException("Hostname is null; Can't function without a valid hostname!"); + } + } catch (UnknownHostException e) { + throw new RuntimeException("Can't function without a valid hostname!", e); + } } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java index 06cfe3f..9a6237c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java @@ -51,23 +51,27 @@ public class ScheduledQueryExecutionService implements Closeable { private ScheduledQueryExecutor worker; private AtomicInteger forcedScheduleCheckCounter = new AtomicInteger(); - public static ScheduledQueryExecutionService startScheduledQueryExecutorService(HiveConf conf0) { + public static ScheduledQueryExecutionService startScheduledQueryExecutorService(HiveConf inputConf) { + HiveConf conf = new HiveConf(inputConf); + MetastoreBasedScheduledQueryService qService = new MetastoreBasedScheduledQueryService(conf); + ExecutorService executor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build()); + ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService); + return startScheduledQueryExecutorService(ctx); + } + + public static ScheduledQueryExecutionService startScheduledQueryExecutorService(ScheduledQueryExecutionContext ctx) { synchronized (ScheduledQueryExecutionService.class) { if (INSTANCE != null) { throw new IllegalStateException( "There is already a ScheduledQueryExecutionService in service; check it and close it explicitly if neccessary"); } - HiveConf conf = new HiveConf(conf0); - MetastoreBasedScheduledQueryService qService = new MetastoreBasedScheduledQueryService(conf); - ExecutorService executor = Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build()); - ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService); INSTANCE = new ScheduledQueryExecutionService(ctx); return INSTANCE; } } - public ScheduledQueryExecutionService(ScheduledQueryExecutionContext ctx) { + private ScheduledQueryExecutionService(ScheduledQueryExecutionContext ctx) { context = ctx; ctx.executor.submit(worker = new ScheduledQueryExecutor()); ctx.executor.submit(new ProgressReporter()); @@ -138,7 +142,7 @@ public class ScheduledQueryExecutionService implements Closeable { reportQueryProgress(); try ( IDriver driver = DriverFactory.newDriver(DriverFactory.getNewQueryState(conf), null)) { - info.setExecutorQueryId(driver.getQueryState().getQueryId()); + info.setExecutorQueryId(buildExecutorQueryId(driver)); reportQueryProgress(); driver.run(q.getQuery()); info.setState(QueryState.FINISHED); @@ -153,11 +157,14 @@ public class ScheduledQueryExecutionService implements Closeable { } catch (Throwable e) { } } - reportQueryProgress(); } } + private String buildExecutorQueryId(IDriver driver) { + return String.format("%s/%s", context.executorHostName, driver.getQueryState().getQueryId()); + } + private String lockNameFor(ScheduledQueryKey scheduleKey) { return String.format("scheduled_query_%s_%s", scheduleKey.getClusterNamespace(), scheduleKey.getScheduleName()); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java b/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java index 9a7b423..a8fe0c3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java @@ -17,15 +17,13 @@ */ package org.apache.hadoop.hive.ql.schq; -import static org.junit.Assert.assertEquals; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.QueryState; import org.apache.hadoop.hive.metastore.api.ScheduledQueryKey; @@ -100,17 +98,19 @@ public class TestScheduledQueryService { return res.size(); } - // Use notify/wait on this object to indicate when the scheduled query has finished executing. - static Object notifier = new Object(); public static class MockScheduledQueryService implements IScheduledQueryMaintenanceService { + // Use notify/wait on this object to indicate when the scheduled query has finished executing. + Object notifier = new Object(); + int id = 0; private String stmt; + ScheduledQueryProgressInfo lastProgressInfo; public MockScheduledQueryService(String string) { stmt = string; } - + @Override public ScheduledQueryPollResponse scheduledQueryPoll() { @@ -129,6 +129,7 @@ public class TestScheduledQueryService { public void scheduledQueryProgress(ScheduledQueryProgressInfo info) { System.out.printf("%d, state: %s, error: %s", info.getScheduledExecutionId(), info.getState(), info.getErrorMessage()); + lastProgressInfo = info; if (info.getState() == QueryState.FINISHED || info.getState() == QueryState.FAILED) { // Query is done, notify any waiters synchronized (notifier) { @@ -152,17 +153,21 @@ public class TestScheduledQueryService { HiveConf conf = env_setup.getTestCtx().hiveConf; MockScheduledQueryService qService = new MockScheduledQueryService("insert into tu values(1),(2),(3),(4),(5)"); ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService); - ScheduledQueryExecutionService sQ = new ScheduledQueryExecutionService(ctx); + try (ScheduledQueryExecutionService sQ = ScheduledQueryExecutionService.startScheduledQueryExecutorService(ctx)) { - executor.shutdown(); - // Wait for the scheduled query to finish. Hopefully 30 seconds should be more than enough. - SessionState.getConsole().logInfo("Waiting for query execution to finish ..."); - synchronized (notifier) { - notifier.wait(30000); + executor.shutdown(); + // Wait for the scheduled query to finish. Hopefully 30 seconds should be more than enough. + SessionState.getConsole().logInfo("Waiting for query execution to finish ..."); + synchronized (qService.notifier) { + qService.notifier.wait(30000); + } + SessionState.getConsole().logInfo("Done waiting for query execution!"); } - SessionState.getConsole().logInfo("Done waiting for query execution!"); executor.shutdownNow(); + assertThat(qService.lastProgressInfo.isSetExecutorQueryId(), is(true)); + assertThat(qService.lastProgressInfo.getExecutorQueryId(), + Matchers.containsString(ctx.executorHostName + "/")); int nr = getNumRowsReturned(driver, "select 1 from tu"); assertThat(nr, Matchers.equalTo(5));