TAJO-811: add simple fifo scheduler support. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4a747a0f Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4a747a0f Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4a747a0f Branch: refs/heads/window_function Commit: 4a747a0f8756f046173ab2eaa15dc2b03bd78379 Parents: 9350a80 Author: jinossy <[email protected]> Authored: Fri May 16 14:43:46 2014 +0900 Committer: jinossy <[email protected]> Committed: Fri May 16 14:43:46 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/client/TajoAdmin.java | 58 +++----- .../tajo/master/TajoMasterClientService.java | 5 +- .../master/querymaster/QueryInProgress.java | 14 +- .../master/querymaster/QueryJobManager.java | 82 +++++++++-- .../tajo/scheduler/QuerySchedulingInfo.java | 55 +++++++ .../org/apache/tajo/scheduler/Scheduler.java | 41 ++++++ .../tajo/scheduler/SchedulingAlgorithms.java | 47 ++++++ .../tajo/scheduler/SimpleFifoScheduler.java | 147 +++++++++++++++++++ .../src/main/resources/webapps/admin/query.jsp | 10 +- .../tajo/scheduler/TestFifoScheduler.java | 110 ++++++++++++++ 11 files changed, 509 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 222f02e..41ac271 100644 --- a/CHANGES +++ b/CHANGES @@ -15,6 +15,8 @@ Release 0.9.0 - unreleased IMPROVEMENT + TAJO-811: add simple fifo scheduler support. (jinho) + TAJO-801: Multiple distinct should be supported. (Hyoungjun Kim via hyunsik) TAJO-807: Implement Round(numeric, int) function. http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java index 9a0478c..25b91a4 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java @@ -21,7 +21,7 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; import org.apache.commons.cli.*; import org.apache.commons.lang.StringUtils; -import org.apache.tajo.TajoProtos.QueryState; +import org.apache.tajo.QueryId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo; import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo; @@ -46,13 +46,9 @@ public class TajoAdmin { } final static String line5 = "-----"; - final static String line7 = "-------"; final static String line10 = "----------"; final static String line12 = "------------"; - final static String line15 = "---------------"; - final static String line20 = "--------------------"; final static String line25 = "-------------------------"; - final static String line30 = "------------------------------"; final static String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; static { @@ -84,18 +80,6 @@ public class TajoAdmin { formatter.printHelp( "admin [options]", options ); } - private String getQueryState(QueryState state) { - String stateStr = "FAILED"; - - if (TajoClient.isQueryRunnning(state)) { - stateStr = "RUNNING"; - } else if (state == QueryState.QUERY_SUCCEEDED) { - stateStr = "SUCCEED"; - } - - return stateStr; - } - public void runCommand(String[] args) throws Exception { CommandLineParser parser = new PosixParser(); CommandLine cmd = parser.parse(options, args); @@ -195,13 +179,13 @@ public class TajoAdmin { writer.write("\n"); writer.write("Started Time: " + df.format(queryInfo.getStartTime())); writer.write("\n"); - String state = getQueryState(queryInfo.getState()); - writer.write("Query State: " + state); + + writer.write("Query State: " + queryInfo.getState().name()); writer.write("\n"); long end = queryInfo.getFinishTime(); long start = queryInfo.getStartTime(); String executionTime = decimalF.format((end-start) / 1000) + " sec"; - if (state.equals("RUNNING") == false) { + if (!TajoClient.isQueryRunnning(queryInfo.getState())) { writer.write("Finished Time: " + df.format(queryInfo.getFinishTime())); writer.write("\n"); } @@ -385,24 +369,28 @@ public class TajoAdmin { ServiceException, SQLException { List<BriefQueryInfo> queryList = tajoClient.getRunningQueryList(); SimpleDateFormat df = new SimpleDateFormat(DATE_FORMAT); - String fmt = "%1$-20s %2$-7s %3$-20s %4$-30s%n"; - String line = String.format(fmt, "QueryId", "State", - "StartTime", "Query"); - writer.write(line); - line = String.format(fmt, line20, line7, line20, line30); - writer.write(line); + StringBuilder builder = new StringBuilder(); - for (BriefQueryInfo queryInfo : queryList) { - String queryId = String.format("q_%s_%04d", - queryInfo.getQueryId().getId(), - queryInfo.getQueryId().getSeq()); - String state = getQueryState(queryInfo.getState()); - String startTime = df.format(queryInfo.getStartTime()); + /* print title */ + builder.append(StringUtils.rightPad("QueryId", 21)); + builder.append(StringUtils.rightPad("State", 20)); + builder.append(StringUtils.rightPad("StartTime", 20)); + builder.append(StringUtils.rightPad("Query", 30)).append("\n"); - String sql = StringUtils.abbreviate(queryInfo.getQuery(), 30); - line = String.format(fmt, queryId, state, startTime, sql); - writer.write(line); + builder.append(StringUtils.rightPad(StringUtils.repeat("-", 20), 21)); + builder.append(StringUtils.rightPad(StringUtils.repeat("-", 19), 20)); + builder.append(StringUtils.rightPad(StringUtils.repeat("-", 19), 20)); + builder.append(StringUtils.rightPad(StringUtils.repeat("-", 29), 30)).append("\n"); + writer.write(builder.toString()); + + builder = new StringBuilder(); + for (BriefQueryInfo queryInfo : queryList) { + builder.append(StringUtils.rightPad(new QueryId(queryInfo.getQueryId()).toString(), 21)); + builder.append(StringUtils.rightPad(queryInfo.getState().name(), 20)); + builder.append(StringUtils.rightPad(df.format(queryInfo.getStartTime()), 20)); + builder.append(StringUtils.abbreviate(queryInfo.getQuery(), 30)).append("\n"); } + writer.write(builder.toString()); } public void processKill(Writer writer, String queryIdStr) http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index f6ad085..97f59ef 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -354,9 +354,8 @@ public class TajoMasterClientService extends AbstractService { context.getSessionManager().touch(request.getSessionId().getId()); GetQueryListResponse.Builder builder= GetQueryListResponse.newBuilder(); - Collection<QueryInProgress> queries - = context.getQueryJobManager().getRunningQueries(); - + Collection<QueryInProgress> queries = new ArrayList<QueryInProgress>(context.getQueryJobManager().getSubmittedQueries()); + queries.addAll(context.getQueryJobManager().getRunningQueries()); BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder(); for (QueryInProgress queryInProgress : queries) { http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java index dac2d4c..e561a4c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java @@ -101,8 +101,10 @@ public class QueryInProgress extends CompositeService { super.init(conf); } - public void kill() { - queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get()); + public synchronized void kill() { + if(queryMasterRpcClient != null){ + queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get()); + } } @Override @@ -202,10 +204,6 @@ public class QueryInProgress extends CompositeService { } } - public QueryMasterProtocolService getQueryMasterRpcClient() { - return queryMasterRpcClient; - } - private void connectQueryMaster() throws Exception { InetSocketAddress addr = NetUtils.createSocketAddrForHost( queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort()); @@ -260,6 +258,10 @@ public class QueryInProgress extends CompositeService { return this.queryInfo; } + public boolean isStarted() { + return this.querySubmitted.get(); + } + private void heartbeat(QueryInfo queryInfo) { LOG.info("Received QueryMaster heartbeat:" + queryInfo); this.queryInfo.setQueryState(queryInfo.getQueryState()); http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java index bc5fcad..66db9d6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java @@ -26,11 +26,13 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.TajoProtos; import org.apache.tajo.engine.planner.logical.LogicalRootNode; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.session.Session; +import org.apache.tajo.scheduler.SimpleFifoScheduler; import java.util.Collection; import java.util.Collections; @@ -45,6 +47,10 @@ public class QueryJobManager extends CompositeService { private AsyncDispatcher dispatcher; + private SimpleFifoScheduler scheduler; + + private final Map<QueryId, QueryInProgress> submittedQueries = new HashMap<QueryId, QueryInProgress>(); + private final Map<QueryId, QueryInProgress> runningQueries = new HashMap<QueryId, QueryInProgress>(); private final Map<QueryId, QueryInProgress> finishedQueries = new HashMap<QueryId, QueryInProgress>(); @@ -61,6 +67,8 @@ public class QueryJobManager extends CompositeService { addService(this.dispatcher); this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler()); + + this.scheduler = new SimpleFifoScheduler(this); } catch (Exception e) { catchException(null, e); } @@ -75,11 +83,13 @@ public class QueryJobManager extends CompositeService { eachQueryInProgress.stop(); } } + this.scheduler.stop(); super.stop(); } @Override public void start() { + this.scheduler.start(); super.start(); } @@ -87,6 +97,10 @@ public class QueryJobManager extends CompositeService { return dispatcher.getEventHandler(); } + public Collection<QueryInProgress> getSubmittedQueries() { + return Collections.unmodifiableCollection(submittedQueries.values()); + } + public Collection<QueryInProgress> getRunningQueries() { return Collections.unmodifiableCollection(runningQueries.values()); } @@ -102,40 +116,75 @@ public class QueryJobManager extends CompositeService { QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql, jsonExpr, plan); - synchronized(runningQueries) { - runningQueries.put(queryId, queryInProgress); + synchronized (submittedQueries) { + queryInProgress.getQueryInfo().setQueryMaster(""); + submittedQueries.put(queryInProgress.getQueryId(), queryInProgress); + } + + scheduler.addQuery(queryInProgress); + return queryInProgress.getQueryInfo(); + } + + public QueryInfo startQueryJob(QueryId queryId) throws Exception { + + QueryInProgress queryInProgress; + + synchronized (submittedQueries) { + queryInProgress = submittedQueries.remove(queryId); + } + + synchronized (runningQueries) { + runningQueries.put(queryInProgress.getQueryId(), queryInProgress); } addService(queryInProgress); queryInProgress.init(getConfig()); queryInProgress.start(); - if(!queryInProgress.startQueryMaster()) { - return null; + if (!queryInProgress.startQueryMaster()) { + stopQuery(queryId); } return queryInProgress.getQueryInfo(); } + public TajoMaster.MasterContext getMasterContext() { + return masterContext; + } + class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> { @Override public void handle(QueryJobEvent event) { - QueryInProgress queryInProgress = null; - synchronized(runningQueries) { - queryInProgress = runningQueries.get(event.getQueryInfo().getQueryId()); - if(queryInProgress == null) { - LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]"); - return; + QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId()); + if(queryInProgress == null) { + LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]"); + return; + } + if(queryInProgress.isStarted()){ + queryInProgress.getEventHandler().handle(event); + } else { + if(event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL){ + scheduler.removeQuery(queryInProgress.getQueryId()); + queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); + + stopQuery(queryInProgress.getQueryId()); } } - queryInProgress.getEventHandler().handle(event); } } public QueryInProgress getQueryInProgress(QueryId queryId) { - synchronized(runningQueries) { - return runningQueries.get(queryId); + QueryInProgress queryInProgress; + synchronized (submittedQueries) { + queryInProgress = submittedQueries.get(queryId); } + + if (queryInProgress == null) { + synchronized (runningQueries) { + queryInProgress = runningQueries.get(queryId); + } + } + return queryInProgress; } public QueryInProgress getFinishedQuery(QueryId queryId) { @@ -149,8 +198,15 @@ public class QueryJobManager extends CompositeService { QueryInProgress queryInProgress = getQueryInProgress(queryId); if(queryInProgress != null) { queryInProgress.stop(); + synchronized(submittedQueries) { + submittedQueries.remove(queryId); + } + synchronized(runningQueries) { runningQueries.remove(queryId); + } + + synchronized(finishedQueries) { finishedQueries.put(queryId, queryInProgress); } } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java new file mode 100644 index 0000000..d9932bd --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.scheduler; + +import com.google.common.base.Objects; +import org.apache.tajo.QueryId; + +public class QuerySchedulingInfo { + private QueryId queryId; + private Integer priority; + private Long startTime; + + public QuerySchedulingInfo(QueryId queryId, Integer priority, Long startTime) { + this.queryId = queryId; + this.priority = priority; + this.startTime = startTime; + } + + public QueryId getQueryId() { + return queryId; + } + + public Integer getPriority() { + return priority; + } + + public Long getStartTime() { + return startTime; + } + + public String getName() { + return queryId.getId(); + } + + @Override + public int hashCode() { + return Objects.hashCode(startTime, getName(), priority); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java new file mode 100644 index 0000000..d74280c --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.scheduler; + +import org.apache.tajo.QueryId; +import org.apache.tajo.master.querymaster.QueryInProgress; + +import java.util.List; + +public interface Scheduler { + + public Mode getMode(); + + public String getName(); + + public boolean addQuery(QueryInProgress resource); + + public boolean removeQuery(QueryId queryId); + + public List<QueryInProgress> getRunningQueries(); + + public enum Mode { + FIFO + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java new file mode 100644 index 0000000..9c9b16d --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.scheduler; + +import java.util.Comparator; + +/** + * Utility class containing scheduling algorithms used in the scheduler. + */ + +public class SchedulingAlgorithms { + /** + * Compare Schedulables in order of priority and then submission time, as in + * the default FIFO scheduler in Tajo. + */ + public static class FifoComparator implements Comparator<QuerySchedulingInfo> { + @Override + public int compare(QuerySchedulingInfo q1, QuerySchedulingInfo q2) { + int res = q1.getPriority().compareTo(q2.getPriority()); + if (res == 0) { + res = (int) Math.signum(q1.getStartTime() - q2.getStartTime()); + } + if (res == 0) { + // In the rare case where jobs were submitted at the exact same time, + // compare them by name (which will be the QueryId) to get a deterministic ordering + res = q1.getName().compareTo(q2.getName()); + } + return res; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java new file mode 100644 index 0000000..87968a5 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.QueryId; +import org.apache.tajo.master.querymaster.QueryInProgress; +import org.apache.tajo.master.querymaster.QueryJobManager; + +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SimpleFifoScheduler implements Scheduler { + private static final Log LOG = LogFactory.getLog(SimpleFifoScheduler.class.getName()); + private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>(); + private final Thread queryProcessor; + private static AtomicBoolean stopped = new AtomicBoolean(); + private QueryJobManager manager; + private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator(); + + public SimpleFifoScheduler(QueryJobManager manager) { + this.manager = manager; + this.queryProcessor = new Thread(new QueryProcessor()); + this.queryProcessor.setName("Query Processor"); + } + + @Override + public Mode getMode() { + return Mode.FIFO; + } + + @Override + public String getName() { + return manager.getName(); + } + + @Override + public boolean addQuery(QueryInProgress queryInProgress) { + int qSize = pool.size(); + if (qSize != 0 && qSize % 100 == 0) { + LOG.info("Size of Fifo queue is " + qSize); + } + + QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, queryInProgress.getStartTime()); + boolean result = pool.add(querySchedulingInfo); + if (getRunningQueries().size() == 0) wakeupProcessor(); + return result; + } + + @Override + public boolean removeQuery(QueryId queryId) { + return pool.remove(getQueryByQueryId(queryId)); + } + + public QuerySchedulingInfo getQueryByQueryId(QueryId queryId) { + for (QuerySchedulingInfo querySchedulingInfo : pool) { + if (querySchedulingInfo.getQueryId().equals(queryId)) { + return querySchedulingInfo; + } + } + return null; + } + + @Override + public List<QueryInProgress> getRunningQueries() { + return new ArrayList<QueryInProgress>(manager.getRunningQueries()); + } + + public void start() { + queryProcessor.start(); + } + + public void stop() { + if (stopped.getAndSet(true)) { + return; + } + pool.clear(); + synchronized (queryProcessor) { + queryProcessor.interrupt(); + } + } + + private QuerySchedulingInfo pollScheduledQuery() { + if (pool.size() > 1) { + Collections.sort(pool, COMPARATOR); + } + return pool.poll(); + } + + private void wakeupProcessor() { + synchronized (queryProcessor) { + queryProcessor.notifyAll(); + } + } + + private final class QueryProcessor implements Runnable { + @Override + public void run() { + + QuerySchedulingInfo query; + + while (!stopped.get() && !Thread.currentThread().isInterrupted()) { + query = null; + if (getRunningQueries().size() == 0) { + query = pollScheduledQuery(); + } + + if (query != null) { + try { + manager.startQueryJob(query.getQueryId()); + } catch (Throwable t) { + LOG.fatal("Exception during query startup:", t); + manager.stopQuery(query.getQueryId()); + } + } + + synchronized (queryProcessor) { + try { + queryProcessor.wait(500); + } catch (InterruptedException e) { + if (stopped.get()) { + break; + } + LOG.warn("Exception during shutdown: ", e); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/resources/webapps/admin/query.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp index 6f15a0e..4e8d7b0 100644 --- a/tajo-core/src/main/resources/webapps/admin/query.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query.jsp @@ -26,16 +26,16 @@ <%@ page import="org.apache.tajo.util.StringUtils" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="java.text.SimpleDateFormat" %> -<%@ page import="java.util.Collection" %> -<%@ page import="java.util.HashMap" %> -<%@ page import="java.util.List" %> -<%@ page import="java.util.Map" %> +<%@ page import="java.util.*" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); List<QueryInProgress> runningQueries = - JSPUtil.sortQueryInProgress(master.getContext().getQueryJobManager().getRunningQueries(), true); + new ArrayList<QueryInProgress>(master.getContext().getQueryJobManager().getSubmittedQueries()); + + runningQueries.addAll(master.getContext().getQueryJobManager().getRunningQueries()); + JSPUtil.sortQueryInProgress(runningQueries, true); List<QueryInProgress> finishedQueries = JSPUtil.sortQueryInProgress(master.getContext().getQueryJobManager().getFinishedQueries(), true); http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java new file mode 100644 index 0000000..76f22d0 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.scheduler; + +import com.google.protobuf.ServiceException; +import org.apache.tajo.*; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.ClientProtos; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.sql.ResultSet; + +import static org.junit.Assert.*; + +@Category(IntegrationTest.class) +public class TestFifoScheduler { + private static TajoTestingCluster cluster; + private static TajoConf conf; + private static TajoClient client; + + @BeforeClass + public static void setUp() throws Exception { + cluster = TpchTestBase.getInstance().getTestingCluster(); + conf = cluster.getConfiguration(); + client = new TajoClient(conf); + } + + @AfterClass + public static void tearDown() throws Exception { + client.close(); + } + + @Test + public final void testKillScheduledQuery() throws IOException, ServiceException, InterruptedException { + ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(2) from lineitem"); + ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(2) from lineitem"); + Thread.sleep(1000); + QueryId queryId = new QueryId(res.getQueryId()); + QueryId queryId2 = new QueryId(res2.getQueryId()); + assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState()); + + client.killQuery(queryId2); + assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId2).getState()); + client.killQuery(queryId); + assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId).getState()); + } + + @Test + public final void testForwardedQuery() throws IOException, ServiceException, InterruptedException { + ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(2) from lineitem"); + ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select * from lineitem limit 1"); + + Thread.sleep(1000); + assertFalse(res2.getIsForwarded()); + QueryId queryId2 = new QueryId(res2.getQueryId()); + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState()); + ResultSet resSet = TajoClient.createResultSet(client, res2); + assertNotNull(resSet); + + QueryId queryId = new QueryId(res.getQueryId()); + assertEquals(TajoProtos.QueryState.QUERY_RUNNING, client.getQueryStatus(queryId).getState()); + client.killQuery(queryId); + } + + @Test + public final void testScheduledQuery() throws IOException, ServiceException, InterruptedException { + ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(2) from lineitem"); + ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(2) from lineitem"); + ClientProtos.SubmitQueryResponse res3 = client.executeQuery("select sleep(2) from lineitem"); + ClientProtos.SubmitQueryResponse res4 = client.executeQuery("select sleep(2) from lineitem"); + + Thread.sleep(1000); + + QueryId queryId = new QueryId(res.getQueryId()); + QueryId queryId2 = new QueryId(res2.getQueryId()); + QueryId queryId3 = new QueryId(res3.getQueryId()); + QueryId queryId4 = new QueryId(res4.getQueryId()); + assertEquals(TajoProtos.QueryState.QUERY_RUNNING, client.getQueryStatus(queryId).getState()); + + assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState()); + assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState()); + assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId4).getState()); + + client.killQuery(queryId2); + client.killQuery(queryId3); + client.killQuery(queryId4); + client.killQuery(queryId); + } +}
