Merge branch '1.6' into 1.7 Conflicts: server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f6ba154f Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f6ba154f Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f6ba154f Branch: refs/heads/master Commit: f6ba154f2d919961b092e50c89f94efc7f7763be Parents: caef59e d5e26b5 Author: Christopher Tubbs <ctubb...@apache.org> Authored: Wed May 27 16:28:43 2015 -0400 Committer: Christopher Tubbs <ctubb...@apache.org> Committed: Wed May 27 16:28:43 2015 -0400 ---------------------------------------------------------------------- .../src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6ba154f/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java index abf3d1a,0000000..6d5adce mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java @@@ -1,147 -1,0 +1,147 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.scan; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.tserver.TabletServer; + +public abstract class ScanTask<T> implements RunnableFuture<T> { + + protected final TabletServer server; + protected AtomicBoolean interruptFlag; + protected ArrayBlockingQueue<Object> resultQueue; + protected AtomicInteger state; + protected AtomicReference<ScanRunState> runState; + + private static final int INITIAL = 1; + private static final int ADDED = 2; + private static final int CANCELED = 3; + + ScanTask(TabletServer server) { + this.server = server; + interruptFlag = new AtomicBoolean(false); + runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED); + state = new AtomicInteger(INITIAL); + resultQueue = new ArrayBlockingQueue<Object>(1); + } + + protected void addResult(Object o) { + if (state.compareAndSet(INITIAL, ADDED)) + resultQueue.add(o); + else if (state.get() == ADDED) + throw new IllegalStateException("Tried to add more than one result"); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (!mayInterruptIfRunning) + throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task"); + + if (state.get() == CANCELED) + return true; + + if (state.compareAndSet(INITIAL, CANCELED)) { + interruptFlag.set(true); + resultQueue = null; + return true; + } + + return false; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + + ArrayBlockingQueue<Object> localRQ = resultQueue; + + if (isCancelled()) + throw new CancellationException(); + + if (localRQ == null) { + int st = state.get(); + String stateStr; + switch (st) { + case ADDED: + stateStr = "ADDED"; + break; + case CANCELED: + stateStr = "CANCELED"; + break; + case INITIAL: + stateStr = "INITIAL"; + break; + default: - stateStr = "UNKONWN"; ++ stateStr = "UNKNOWN"; + break; + } + throw new IllegalStateException("Tried to get result twice [state=" + stateStr + "(" + st + ")]"); + } + + Object r = localRQ.poll(timeout, unit); + + // could have been canceled while waiting + if (isCancelled()) { + if (r != null) + throw new IllegalStateException("Nothing should have been added when in canceled state"); + + throw new CancellationException(); + } + + if (r == null) + throw new TimeoutException(); + + // make this method stop working now that something is being + // returned + resultQueue = null; + + if (r instanceof Throwable) + throw new ExecutionException((Throwable) r); + + @SuppressWarnings("unchecked") + T rAsT = (T) r; + return rAsT; + } + + @Override + public boolean isCancelled() { + return state.get() == CANCELED; + } + + @Override + public boolean isDone() { + return runState.get().equals(ScanRunState.FINISHED); + } + + public ScanRunState getScanRunState() { + return runState.get(); + } + +}