Merge branch '1.6' into 1.7

Conflicts:
        
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f6ba154f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f6ba154f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f6ba154f

Branch: refs/heads/master
Commit: f6ba154f2d919961b092e50c89f94efc7f7763be
Parents: caef59e d5e26b5
Author: Christopher Tubbs <ctubb...@apache.org>
Authored: Wed May 27 16:28:43 2015 -0400
Committer: Christopher Tubbs <ctubb...@apache.org>
Committed: Wed May 27 16:28:43 2015 -0400

----------------------------------------------------------------------
 .../src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6ba154f/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
index abf3d1a,0000000..6d5adce
mode 100644,000000..100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
@@@ -1,147 -1,0 +1,147 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.scan;
 +
 +import java.util.concurrent.ArrayBlockingQueue;
 +import java.util.concurrent.CancellationException;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.RunnableFuture;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import org.apache.accumulo.tserver.TabletServer;
 +
 +public abstract class ScanTask<T> implements RunnableFuture<T> {
 +
 +  protected final TabletServer server;
 +  protected AtomicBoolean interruptFlag;
 +  protected ArrayBlockingQueue<Object> resultQueue;
 +  protected AtomicInteger state;
 +  protected AtomicReference<ScanRunState> runState;
 +
 +  private static final int INITIAL = 1;
 +  private static final int ADDED = 2;
 +  private static final int CANCELED = 3;
 +
 +  ScanTask(TabletServer server) {
 +    this.server = server;
 +    interruptFlag = new AtomicBoolean(false);
 +    runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
 +    state = new AtomicInteger(INITIAL);
 +    resultQueue = new ArrayBlockingQueue<Object>(1);
 +  }
 +
 +  protected void addResult(Object o) {
 +    if (state.compareAndSet(INITIAL, ADDED))
 +      resultQueue.add(o);
 +    else if (state.get() == ADDED)
 +      throw new IllegalStateException("Tried to add more than one result");
 +  }
 +
 +  @Override
 +  public boolean cancel(boolean mayInterruptIfRunning) {
 +    if (!mayInterruptIfRunning)
 +      throw new IllegalArgumentException("Cancel will always attempt to 
interupt running next batch task");
 +
 +    if (state.get() == CANCELED)
 +      return true;
 +
 +    if (state.compareAndSet(INITIAL, CANCELED)) {
 +      interruptFlag.set(true);
 +      resultQueue = null;
 +      return true;
 +    }
 +
 +    return false;
 +  }
 +
 +  @Override
 +  public T get() throws InterruptedException, ExecutionException {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
 +
 +    ArrayBlockingQueue<Object> localRQ = resultQueue;
 +
 +    if (isCancelled())
 +      throw new CancellationException();
 +
 +    if (localRQ == null) {
 +      int st = state.get();
 +      String stateStr;
 +      switch (st) {
 +        case ADDED:
 +          stateStr = "ADDED";
 +          break;
 +        case CANCELED:
 +          stateStr = "CANCELED";
 +          break;
 +        case INITIAL:
 +          stateStr = "INITIAL";
 +          break;
 +        default:
-           stateStr = "UNKONWN";
++          stateStr = "UNKNOWN";
 +          break;
 +      }
 +      throw new IllegalStateException("Tried to get result twice [state=" + 
stateStr + "(" + st + ")]");
 +    }
 +
 +    Object r = localRQ.poll(timeout, unit);
 +
 +    // could have been canceled while waiting
 +    if (isCancelled()) {
 +      if (r != null)
 +        throw new IllegalStateException("Nothing should have been added when 
in canceled state");
 +
 +      throw new CancellationException();
 +    }
 +
 +    if (r == null)
 +      throw new TimeoutException();
 +
 +    // make this method stop working now that something is being
 +    // returned
 +    resultQueue = null;
 +
 +    if (r instanceof Throwable)
 +      throw new ExecutionException((Throwable) r);
 +
 +    @SuppressWarnings("unchecked")
 +    T rAsT = (T) r;
 +    return rAsT;
 +  }
 +
 +  @Override
 +  public boolean isCancelled() {
 +    return state.get() == CANCELED;
 +  }
 +
 +  @Override
 +  public boolean isDone() {
 +    return runState.get().equals(ScanRunState.FINISHED);
 +  }
 +
 +  public ScanRunState getScanRunState() {
 +    return runState.get();
 +  }
 +
 +}

Reply via email to