Repository: accumulo
Updated Branches:
  refs/heads/1.8 a5ed1ba3a -> 1ab2431c9


ACCUMULO-4643 Allow iterators to yield

Implemented a mechanism to allow iterators to yield control allowing other 
scans to use the scan thread.
Added requirement for hasTop to return false after yielding.

closes #263


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1ab2431c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1ab2431c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1ab2431c

Branch: refs/heads/1.8
Commit: 1ab2431c96b64e90350ba54045a774a0f1e25404
Parents: a5ed1ba
Author: Ivan Bella <i...@bella.name>
Authored: Thu May 25 14:22:24 2017 -0400
Committer: Ivan Bella <i...@bella.name>
Committed: Thu Jul 20 11:19:52 2017 -0400

----------------------------------------------------------------------
 .../core/iterators/SortedKeyValueIterator.java  |   3 +-
 .../accumulo/core/iterators/YieldCallback.java  |  57 +++++++
 .../iterators/YieldingKeyValueIterator.java     |  36 +++++
 .../system/SourceSwitchingIterator.java         |  49 +++++-
 .../system/SourceSwitchingIteratorTest.java     | 112 +++++++++++++
 .../testcases/YieldingTestCase.java             |  87 ++++++++++
 .../apache/accumulo/tserver/TabletServer.java   |   4 +
 .../Metrics2TabletServerScanMetrics.java        |   5 +-
 .../metrics/TabletServerScanMetricsKeys.java    |   1 +
 .../apache/accumulo/tserver/tablet/Scanner.java |   2 +-
 .../apache/accumulo/tserver/tablet/Tablet.java  |  77 +++++++--
 .../apache/accumulo/test/YieldScannersIT.java   | 161 +++++++++++++++++++
 .../test/functional/YieldingIterator.java       | 129 +++++++++++++++
 13 files changed, 702 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
index ce5ef24..38158ff 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
@@ -50,7 +50,8 @@ public interface SortedKeyValueIterator<K extends 
WritableComparable<?>,V extend
   void init(SortedKeyValueIterator<K,V> source, Map<String,String> options, 
IteratorEnvironment env) throws IOException;
 
   /**
-   * Returns true if the iterator has more elements.
+   * Returns true if the iterator has more elements. Note that if this 
iterator has yielded (@see enableYielding(YieldCallback)), this this method 
must return
+   * false.
    *
    * @return <tt>true</tt> if the iterator has more elements.
    * @exception IllegalStateException

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/core/src/main/java/org/apache/accumulo/core/iterators/YieldCallback.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/YieldCallback.java 
b/core/src/main/java/org/apache/accumulo/core/iterators/YieldCallback.java
new file mode 100644
index 0000000..3d151cf
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/YieldCallback.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators;
+
+/**
+ * This callback handles the state of yielding within an iterator
+ */
+public class YieldCallback<K> {
+  private K key;
+
+  /**
+   * Called by the iterator when a next or seek call yields control.
+   *
+   * @param key
+   *          the key position at which the iterator yielded.
+   */
+  public void yield(K key) {
+    this.key = key;
+  }
+
+  /**
+   * Called by the client to see if the iterator yielded
+   *
+   * @return true if iterator yielded control
+   */
+  public boolean hasYielded() {
+    return (this.key != null);
+  }
+
+  /**
+   * Called by the client to get the yield position used as the start key 
(non-inclusive) of the range in a subsequent seek call when the iterator is 
rebuilt.
+   * This will also reset the state returned by hasYielded.
+   *
+   * @return <tt>K</tt> The key position
+   */
+  public K getPositionAndReset() {
+    try {
+      return this.key;
+    } finally {
+      this.key = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/core/src/main/java/org/apache/accumulo/core/iterators/YieldingKeyValueIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/YieldingKeyValueIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/iterators/YieldingKeyValueIterator.java
new file mode 100644
index 0000000..76f8f31
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/iterators/YieldingKeyValueIterator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * An iterator that supports iterating over key and value pairs, and supports 
yielding on a next or seek call.
+ */
+public interface YieldingKeyValueIterator<K extends WritableComparable<?>,V 
extends Writable> extends SortedKeyValueIterator<K,V> {
+
+  /**
+   * Allows implementations to preempt further iteration of this iterator in 
the current RPC. Implementations can use the yield method on the callback to
+   * instruct the caller to cease collecting more results within this RPC. An 
implementation would only need to implement this mechanism if a next or seek 
call
+   * has been taking so long as to starve out other scans within the same 
thread pool. Most iterators do not need to implement this method. The yield 
method on
+   * the callback accepts a Key which will be used as the start key 
(non-inclusive) on the seek call in the next RPC. This feature is not supported 
for isolated
+   * scans.
+   */
+  void enableYielding(YieldCallback<K> callback);
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
index 098aa63..9cb3bd7 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.base.Optional;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
@@ -30,13 +31,15 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.YieldCallback;
+import org.apache.accumulo.core.iterators.YieldingKeyValueIterator;
 
 /**
  * A SortedKeyValueIterator which presents a view over some section of data, 
regardless of whether or not it is backed by memory (InMemoryMap) or an RFile
  * (InMemoryMap that was minor compacted to a file). Clients reading from a 
table that has data in memory should not see interruption in their scan when 
that
  * data is minor compacted. This iterator is designed to manage this behind 
the scene.
  */
-public class SourceSwitchingIterator implements InterruptibleIterator {
+public class SourceSwitchingIterator implements InterruptibleIterator, 
YieldingKeyValueIterator<Key,Value> {
 
   public interface DataSource {
     boolean isCurrent();
@@ -53,6 +56,8 @@ public class SourceSwitchingIterator implements 
InterruptibleIterator {
   private DataSource source;
   private SortedKeyValueIterator<Key,Value> iter;
 
+  private Optional<YieldCallback<Key>> yield = Optional.absent();
+
   private Key key;
   private Value val;
 
@@ -113,6 +118,18 @@ public class SourceSwitchingIterator implements 
InterruptibleIterator {
   }
 
   @Override
+  public void enableYielding(YieldCallback<Key> yield) {
+    this.yield = Optional.of(yield);
+
+    // if we require row isolation, then we cannot support yielding in the 
middle.
+    if (!onlySwitchAfterRow) {
+      if (iter != null && iter instanceof YieldingKeyValueIterator) {
+        ((YieldingKeyValueIterator<Key,Value>) iter).enableYielding(yield);
+      }
+    }
+  }
+
+  @Override
   public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options, IteratorEnvironment env) throws IOException {
     throw new UnsupportedOperationException();
   }
@@ -126,14 +143,23 @@ public class SourceSwitchingIterator implements 
InterruptibleIterator {
 
   private void readNext(boolean initialSeek) throws IOException {
 
+    // we need to check here if we were yielded in case the source was 
switched out and re-seeked by someone else (minor compaction/InMemoryMap)
+    boolean yielded = (yield.isPresent() && yield.get().hasYielded());
+
     // check of initialSeek second is intentional so that it does not short
     // circuit the call to switchSource
-    boolean seekNeeded = (!onlySwitchAfterRow && switchSource()) || 
initialSeek;
+    boolean seekNeeded = yielded || (!onlySwitchAfterRow && switchSource()) || 
initialSeek;
 
     if (seekNeeded)
       if (initialSeek)
         iter.seek(range, columnFamilies, inclusive);
-      else
+      else if (yielded) {
+        Key yieldPosition = yield.get().getPositionAndReset();
+        if (!range.contains(yieldPosition)) {
+          throw new IOException("Underlying iterator yielded to a position 
outside of its range: " + yieldPosition + " not in " + range);
+        }
+        iter.seek(new Range(yieldPosition, false, range.getEndKey(), 
range.isEndKeyInclusive()), columnFamilies, inclusive);
+      } else
         iter.seek(new Range(key, false, range.getEndKey(), 
range.isEndKeyInclusive()), columnFamilies, inclusive);
     else {
       iter.next();
@@ -144,6 +170,10 @@ public class SourceSwitchingIterator implements 
InterruptibleIterator {
     }
 
     if (iter.hasTop()) {
+      if (yield.isPresent() && yield.get().hasYielded()) {
+        throw new IOException("Coding error: hasTop returned true but has 
yielded at " + yield.get().getPositionAndReset());
+      }
+
       Key nextKey = iter.getTopKey();
       Value nextVal = iter.getTopValue();
 
@@ -163,6 +193,11 @@ public class SourceSwitchingIterator implements 
InterruptibleIterator {
     if (!source.isCurrent()) {
       source = source.getNewDataSource();
       iter = source.iterator();
+      if (!onlySwitchAfterRow && yield.isPresent()) {
+        if (iter instanceof YieldingKeyValueIterator) {
+          ((YieldingKeyValueIterator<Key,Value>) 
iter).enableYielding(yield.get());
+        }
+      }
       return true;
     }
 
@@ -176,8 +211,14 @@ public class SourceSwitchingIterator implements 
InterruptibleIterator {
       this.inclusive = inclusive;
       this.columnFamilies = columnFamilies;
 
-      if (iter == null)
+      if (iter == null) {
         iter = source.iterator();
+        if (!onlySwitchAfterRow && yield.isPresent()) {
+          if (iter instanceof YieldingKeyValueIterator) {
+            ((YieldingKeyValueIterator<Key,Value>) 
iter).enableYielding(yield.get());
+          }
+        }
+      }
 
       readNext(true);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/core/src/test/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIteratorTest.java
 
b/core/src/test/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIteratorTest.java
index e6ca6d3..037ee7e 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIteratorTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIteratorTest.java
@@ -16,21 +16,28 @@
  */
 package org.apache.accumulo.core.iterators.system;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.base.Optional;
 import junit.framework.TestCase;
 
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.SortedMapIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.iterators.YieldCallback;
+import org.apache.accumulo.core.iterators.YieldingKeyValueIterator;
 import 
org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
 import org.apache.hadoop.io.Text;
 
@@ -273,4 +280,109 @@ public class SourceSwitchingIteratorTest extends TestCase 
{
     } catch (IterationInterruptedException iie) {}
 
   }
+
+  private Range yield(Range r, SourceSwitchingIterator ssi, YieldCallback<Key> 
yield) throws IOException {
+    while (yield.hasYielded()) {
+      Key yieldPosition = yield.getPositionAndReset();
+      if (!r.contains(yieldPosition)) {
+        throw new IOException("Underlying iterator yielded to a position 
outside of its range: " + yieldPosition + " not in " + r);
+      }
+      r = new Range(yieldPosition, false, (Key) null, r.isEndKeyInclusive());
+      ssi.seek(r, new ArrayList<ByteSequence>(), false);
+    }
+    return r;
+  }
+
+  public void testYield() throws Exception {
+    TreeMap<Key,Value> tm1 = new TreeMap<>();
+    put(tm1, "r1", "cf1", "cq1", 5, "v1");
+    put(tm1, "r1", "cf1", "cq3", 5, "v2");
+    put(tm1, "r2", "cf1", "cq1", 5, "v3");
+
+    SortedMapIterator smi = new SortedMapIterator(tm1);
+    YieldingIterator ymi = new YieldingIterator(smi);
+    TestDataSource tds = new TestDataSource(ymi);
+    SourceSwitchingIterator ssi = new SourceSwitchingIterator(tds);
+
+    YieldCallback<Key> yield = new YieldCallback<>();
+    ssi.enableYielding(yield);
+
+    Range r = new Range();
+    ssi.seek(r, new ArrayList<ByteSequence>(), false);
+    r = yield(r, ssi, yield);
+    testAndCallNext(ssi, "r1", "cf1", "cq1", 5, "v1", true);
+    r = yield(r, ssi, yield);
+    testAndCallNext(ssi, "r1", "cf1", "cq3", 5, "v2", true);
+    r = yield(r, ssi, yield);
+    testAndCallNext(ssi, "r2", "cf1", "cq1", 5, "v3", true);
+    r = yield(r, ssi, yield);
+    assertFalse(ssi.hasTop());
+  }
+
+  /**
+   * This iterator which implements yielding will yield after every other next 
and every other seek call.
+   */
+  private final AtomicBoolean yieldNextKey = new AtomicBoolean(false);
+  private final AtomicBoolean yieldSeekKey = new AtomicBoolean(false);
+
+  public class YieldingIterator extends WrappingIterator implements 
YieldingKeyValueIterator<Key,Value> {
+    private Optional<YieldCallback<Key>> yield = Optional.absent();
+
+    public YieldingIterator(SortedKeyValueIterator<Key,Value> source) {
+      setSource(source);
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) 
{
+      return new YieldingIterator(getSource().deepCopy(env));
+    }
+
+    @Override
+    public boolean hasTop() {
+      return (!(yield.isPresent() && yield.get().hasYielded()) && 
super.hasTop());
+    }
+
+    @Override
+    public void next() throws IOException {
+      boolean yielded = false;
+
+      // yield on every other next call.
+      yieldNextKey.set(!yieldNextKey.get());
+      if (yield.isPresent() && yieldNextKey.get()) {
+        yielded = true;
+        // since we are not actually skipping keys underneath, simply use the 
key following the top key as the yield key
+        
yield.get().yield(getTopKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME));
+      }
+
+      // if not yielding, then simply pass on the next call
+      if (!yielded) {
+        super.next();
+      }
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive) throws IOException {
+      boolean yielded = false;
+
+      if (!range.isStartKeyInclusive()) {
+        // yield on every other seek call.
+        yieldSeekKey.set(!yieldSeekKey.get());
+        if (yield.isPresent() && yieldSeekKey.get()) {
+          yielded = true;
+          // since we are not actually skipping keys underneath, simply use 
the key following the range start key
+          
yield.get().yield(range.getStartKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME));
+        }
+      }
+
+      // if not yielding, then simply pass on the call to the source
+      if (!yielded) {
+        super.seek(range, columnFamilies, inclusive);
+      }
+    }
+
+    @Override
+    public void enableYielding(YieldCallback<Key> yield) {
+      this.yield = Optional.of(yield);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/iterator-test-harness/src/main/java/org/apache/accumulo/iteratortest/testcases/YieldingTestCase.java
----------------------------------------------------------------------
diff --git 
a/iterator-test-harness/src/main/java/org/apache/accumulo/iteratortest/testcases/YieldingTestCase.java
 
b/iterator-test-harness/src/main/java/org/apache/accumulo/iteratortest/testcases/YieldingTestCase.java
new file mode 100644
index 0000000..f9de207
--- /dev/null
+++ 
b/iterator-test-harness/src/main/java/org/apache/accumulo/iteratortest/testcases/YieldingTestCase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.iteratortest.testcases;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.YieldCallback;
+import org.apache.accumulo.core.iterators.YieldingKeyValueIterator;
+import org.apache.accumulo.iteratortest.IteratorTestInput;
+import org.apache.accumulo.iteratortest.IteratorTestOutput;
+import org.apache.accumulo.iteratortest.IteratorTestUtil;
+import org.apache.accumulo.iteratortest.environments.SimpleIteratorEnvironment;
+
+import java.io.IOException;
+import java.util.TreeMap;
+
+/**
+ * Test case that verifies that an iterator works correctly with the yielding 
api. Note that most iterators do nothing in terms of yielding in which case this
+ * merely tests that the iterator produces the correct output. If however the 
iterator does override the yielding api, then this ensures that it works 
correctly
+ * iff the iterator actually decides to yield. Nothing can force an iterator 
to yield without knowing something about the internals of the iterator being
+ * tested.
+ */
+public class YieldingTestCase extends OutputVerifyingTestCase {
+
+  @Override
+  public IteratorTestOutput test(IteratorTestInput testInput) {
+    final SortedKeyValueIterator<Key,Value> skvi = 
IteratorTestUtil.instantiateIterator(testInput);
+    final SortedKeyValueIterator<Key,Value> source = 
IteratorTestUtil.createSource(testInput);
+
+    try {
+      skvi.init(source, testInput.getIteratorOptions(), new 
SimpleIteratorEnvironment());
+
+      YieldCallback<Key> yield = new YieldCallback<>();
+      if (skvi instanceof YieldingKeyValueIterator) {
+        ((YieldingKeyValueIterator<Key,Value>) skvi).enableYielding(yield);
+      }
+
+      skvi.seek(testInput.getRange(), testInput.getFamilies(), 
testInput.isInclusive());
+      return new IteratorTestOutput(consume(testInput, skvi, yield));
+    } catch (IOException e) {
+      return new IteratorTestOutput(e);
+    }
+  }
+
+  TreeMap<Key,Value> consume(IteratorTestInput testInput, 
SortedKeyValueIterator<Key,Value> skvi, YieldCallback<Key> yield) throws 
IOException {
+    TreeMap<Key,Value> data = new TreeMap<>();
+    Key lastKey = null;
+    while (yield.hasYielded() || skvi.hasTop()) {
+      if (yield.hasYielded()) {
+        Range r = testInput.getRange();
+        Key yieldPosition = yield.getPositionAndReset();
+        if (!r.contains(yieldPosition)) {
+          throw new IOException("Underlying iterator yielded to a position 
outside of its range: " + yieldPosition + " not in " + r);
+        }
+        if (skvi.hasTop()) {
+          throw new IOException("Underlying iterator reports having a top, but 
has yielded: " + yieldPosition);
+        }
+        if (lastKey != null && yieldPosition.compareTo(lastKey) <= 0) {
+          throw new IOException("Underlying iterator yielded at a position 
that is not past the last key returned");
+        }
+        skvi.seek(new Range(yieldPosition, false, r.getEndKey(), 
r.isEndKeyInclusive()), testInput.getFamilies(), testInput.isInclusive());
+      } else {
+        // Make sure to copy the K-V
+        data.put(new Key(skvi.getTopKey()), new Value(skvi.getTopValue()));
+        skvi.next();
+      }
+    }
+    return data;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index a937235..6ead6b8 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -282,6 +282,10 @@ public class TabletServer extends AccumuloServerContext 
implements Runnable {
   private final Metrics scanMetrics;
   private final Metrics mincMetrics;
 
+  public Metrics getScanMetrics() {
+    return scanMetrics;
+  }
+
   public Metrics getMinCMetrics() {
     return mincMetrics;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerScanMetrics.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerScanMetrics.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerScanMetrics.java
index 0c72cb5..d2fc871 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerScanMetrics.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerScanMetrics.java
@@ -33,7 +33,7 @@ public class Metrics2TabletServerScanMetrics implements 
Metrics, MetricsSource,
 
   private final MetricsSystem system;
   private final MetricsRegistry registry;
-  private final MutableStat scans, resultsPerScan;
+  private final MutableStat scans, resultsPerScan, yields;
 
   // Use TabletServerMetricsFactory
   Metrics2TabletServerScanMetrics(MetricsSystem system) {
@@ -42,6 +42,7 @@ public class Metrics2TabletServerScanMetrics implements 
Metrics, MetricsSource,
 
     scans = registry.newStat(SCAN, "Scans", "Ops", "Count", true);
     resultsPerScan = registry.newStat(RESULT_SIZE, "Results per scan", "Ops", 
"Count", true);
+    yields = registry.newStat(YIELD, "Yields", "Ops", "Count", true);
   }
 
   @Override
@@ -50,6 +51,8 @@ public class Metrics2TabletServerScanMetrics implements 
Metrics, MetricsSource,
       scans.add(value);
     } else if (RESULT_SIZE.equals(name)) {
       resultsPerScan.add(value);
+    } else if (YIELD.equals(name)) {
+      yields.add(value);
     } else {
       throw new RuntimeException("Could not find metric to update for name " + 
name);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetricsKeys.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetricsKeys.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetricsKeys.java
index 7d33a84..8c9ab34 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetricsKeys.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetricsKeys.java
@@ -23,5 +23,6 @@ public interface TabletServerScanMetricsKeys {
 
   String SCAN = "scan";
   String RESULT_SIZE = "result";
+  String YIELD = "yield";
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
index 15526d7..bf3ba86 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
@@ -95,7 +95,7 @@ public class Scanner {
         iter = new SourceSwitchingIterator(dataSource, false);
       }
 
-      results = tablet.nextBatch(iter, range, options.getNum(), 
options.getColumnSet(), options.getBatchTimeOut());
+      results = tablet.nextBatch(iter, range, options.getNum(), 
options.getColumnSet(), options.getBatchTimeOut(), options.isIsolated());
 
       if (results.getResults() == null) {
         range = null;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 6637521..9278a36 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -75,6 +75,8 @@ import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.YieldCallback;
+import org.apache.accumulo.core.iterators.YieldingKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
 import org.apache.accumulo.core.master.thrift.BulkImportState;
 import org.apache.accumulo.core.master.thrift.TabletLoadState;
@@ -138,6 +140,7 @@ import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.MutationReceiver;
 import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
 import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
 import 
org.apache.accumulo.tserver.tablet.Compactor.CompactionCanceledException;
 import org.apache.accumulo.tserver.tablet.Compactor.CompactionEnv;
 import org.apache.commons.codec.DecoderException;
@@ -546,11 +549,17 @@ public class Tablet implements TabletCommitter {
       batchTimeOut = 0;
     }
 
+    // determine if the iterator supported yielding
+    YieldCallback<Key> yield = new YieldCallback<>();
+    if (mmfi instanceof YieldingKeyValueIterator)
+      ((YieldingKeyValueIterator<Key,Value>) mmfi).enableYielding(yield);
+    boolean yielded = false;
+
     for (Range range : ranges) {
 
       boolean timesUp = batchTimeOut > 0 && System.nanoTime() > returnTime;
 
-      if (exceededMemoryUsage || tabletClosed || timesUp) {
+      if (exceededMemoryUsage || tabletClosed || timesUp || yielded) {
         lookupResult.unfinishedRanges.add(range);
         continue;
       }
@@ -564,6 +573,9 @@ public class Tablet implements TabletCommitter {
           mmfi.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
 
         while (mmfi.hasTop()) {
+          if (yield.hasYielded()) {
+            throw new IOException("Coding error: hasTop returned true but has 
yielded at " + yield.getPositionAndReset());
+          }
           Key key = mmfi.getTopKey();
 
           KVEntry kve = new KVEntry(key, mmfi.getTopValue());
@@ -584,6 +596,23 @@ public class Tablet implements TabletCommitter {
           mmfi.next();
         }
 
+        if (yield.hasYielded()) {
+          yielded = true;
+          Key yieldPosition = yield.getPositionAndReset();
+          if (!range.contains(yieldPosition)) {
+            throw new IOException("Underlying iterator yielded to a position 
outside of its range: " + yieldPosition + " not in " + range);
+          }
+          if (!results.isEmpty() && 
yieldPosition.compareTo(results.get(results.size() - 1).getKey()) <= 0) {
+            throw new IOException("Underlying iterator yielded to a position 
that does not follow the last key returned: " + yieldPosition + " <= "
+                + results.get(results.size() - 1).getKey());
+          }
+          addUnfinishedRange(lookupResult, range, yieldPosition, false);
+
+          log.debug("Scan yield detected at position " + yieldPosition);
+          Metrics scanMetrics = getTabletServer().getScanMetrics();
+          if (scanMetrics.isEnabled())
+            scanMetrics.add(TabletServerScanMetrics.YIELD, 1);
+        }
       } catch (TooManyFilesException tmfe) {
         // treat this as a closed tablet, and let the client retry
         log.warn("Tablet " + getExtent() + " has too many files, batch lookup 
can not run");
@@ -696,7 +725,7 @@ public class Tablet implements TabletCommitter {
     }
   }
 
-  Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int 
num, Set<Column> columns, long batchTimeOut) throws IOException {
+  Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int 
num, Set<Column> columns, long batchTimeOut, boolean isolated) throws 
IOException {
 
     // log.info("In nextBatch..");
 
@@ -713,18 +742,27 @@ public class Tablet implements TabletCommitter {
 
     long maxResultsSize = 
tableConfiguration.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
 
+    Key continueKey = null;
+    boolean skipContinueKey = false;
+
+    YieldCallback<Key> yield = new YieldCallback<>();
+
+    // we cannot yield if we are in isolation mode
+    if (!isolated) {
+      if (iter instanceof YieldingKeyValueIterator)
+        ((YieldingKeyValueIterator<Key,Value>) iter).enableYielding(yield);
+    }
+
     if (columns.size() == 0) {
       iter.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
     } else {
       iter.seek(range, LocalityGroupUtil.families(columns), true);
     }
 
-    Key continueKey = null;
-    boolean skipContinueKey = false;
-
-    boolean endOfTabletReached = false;
     while (iter.hasTop()) {
-
+      if (yield.hasYielded()) {
+        throw new IOException("Coding error: hasTop returned true but has 
yielded at " + yield.getPositionAndReset());
+      }
       value = iter.getTopValue();
       key = iter.getTopKey();
 
@@ -744,17 +782,28 @@ public class Tablet implements TabletCommitter {
       iter.next();
     }
 
-    if (iter.hasTop() == false) {
-      endOfTabletReached = true;
-    }
+    if (yield.hasYielded()) {
+      continueKey = new Key(yield.getPositionAndReset());
+      skipContinueKey = true;
+      if (!range.contains(continueKey)) {
+        throw new IOException("Underlying iterator yielded to a position 
outside of its range: " + continueKey + " not in " + range);
+      }
+      if (!results.isEmpty() && 
continueKey.compareTo(results.get(results.size() - 1).getKey()) <= 0) {
+        throw new IOException("Underlying iterator yielded to a position that 
does not follow the last key returned: " + continueKey + " <= "
+            + results.get(results.size() - 1).getKey());
+      }
 
-    if (endOfTabletReached) {
+      log.debug("Scan yield detected at position " + continueKey);
+      Metrics scanMetrics = getTabletServer().getScanMetrics();
+      if (scanMetrics.isEnabled())
+        scanMetrics.add(TabletServerScanMetrics.YIELD, 1);
+    } else if (iter.hasTop() == false) {
+      // end of tablet has been reached
       continueKey = null;
+      if (results.size() == 0)
+        results = null;
     }
 
-    if (endOfTabletReached && results.size() == 0)
-      results = null;
-
     return new Batch(skipContinueKey, results, continueKey, resultBytes);
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java 
b/test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java
new file mode 100644
index 0000000..4d86dd3
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.YieldingIterator;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// ACCUMULO-4643
+public class YieldScannersIT extends AccumuloClusterHarness {
+  Logger log = LoggerFactory.getLogger(YieldScannersIT.class);
+  private static final char START_ROW = 'a';
+
+  @Override
+  public int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setNumTservers(1);
+  }
+
+  @Test
+  public void testScan() throws Exception {
+    // make a table
+    final String tableName = getUniqueNames(1)[0];
+    final Connector conn = getConnector();
+    conn.tableOperations().create(tableName);
+    final BatchWriter writer = conn.createBatchWriter(tableName, new 
BatchWriterConfig());
+    for (int i = 0; i < 10; i++) {
+      byte[] row = new byte[] {(byte) (START_ROW + i)};
+      Mutation m = new Mutation(new Text(row));
+      m.put(new Text(), new Text(), new Value());
+      writer.addMutation(m);
+    }
+    writer.flush();
+    writer.close();
+
+    log.info("Creating scanner");
+    // make a scanner for a table with 10 keys
+    final Scanner scanner = conn.createScanner(tableName, 
Authorizations.EMPTY);
+    final IteratorSetting cfg = new IteratorSetting(100, 
YieldingIterator.class);
+    scanner.addScanIterator(cfg);
+
+    log.info("iterating");
+    Iterator<Map.Entry<Key,Value>> it = scanner.iterator();
+    int keyCount = 0;
+    int yieldNextCount = 0;
+    int yieldSeekCount = 0;
+    while (it.hasNext()) {
+      Map.Entry<Key,Value> next = it.next();
+      log.info(Integer.toString(keyCount) + ": Got key " + next.getKey() + " 
with value " + next.getValue());
+
+      // verify we got the expected key
+      char expected = (char) (START_ROW + keyCount);
+      Assert.assertEquals("Unexpected row", Character.toString(expected), 
next.getKey().getRow().toString());
+
+      // determine whether we yielded on a next and seek
+      if ((keyCount & 1) != 0) {
+        yieldNextCount++;
+        yieldSeekCount++;
+      }
+      String[] value = StringUtils.split(next.getValue().toString(), ',');
+      Assert.assertEquals("Unexpected yield next count", 
Integer.toString(yieldNextCount), value[0]);
+      Assert.assertEquals("Unexpected yield seek count", 
Integer.toString(yieldSeekCount), value[1]);
+      Assert.assertEquals("Unexpected rebuild count", 
Integer.toString(yieldNextCount + yieldSeekCount), value[2]);
+
+      keyCount++;
+    }
+    Assert.assertEquals("Did not get the expected number of results", 10, 
keyCount);
+  }
+
+  @Test
+  public void testBatchScan() throws Exception {
+    // make a table
+    final String tableName = getUniqueNames(1)[0];
+    final Connector conn = getConnector();
+    conn.tableOperations().create(tableName);
+    final BatchWriter writer = conn.createBatchWriter(tableName, new 
BatchWriterConfig());
+    for (int i = 0; i < 10; i++) {
+      byte[] row = new byte[] {(byte) (START_ROW + i)};
+      Mutation m = new Mutation(new Text(row));
+      m.put(new Text(), new Text(), new Value());
+      writer.addMutation(m);
+    }
+    writer.flush();
+    writer.close();
+
+    log.info("Creating batch scanner");
+    // make a scanner for a table with 10 keys
+    final BatchScanner scanner = conn.createBatchScanner(tableName, 
Authorizations.EMPTY, 1);
+    final IteratorSetting cfg = new IteratorSetting(100, 
YieldingIterator.class);
+    scanner.addScanIterator(cfg);
+    scanner.setRanges(Collections.singleton(new Range()));
+
+    log.info("iterating");
+    Iterator<Map.Entry<Key,Value>> it = scanner.iterator();
+    int keyCount = 0;
+    int yieldNextCount = 0;
+    int yieldSeekCount = 0;
+    while (it.hasNext()) {
+      Map.Entry<Key,Value> next = it.next();
+      log.info(Integer.toString(keyCount) + ": Got key " + next.getKey() + " 
with value " + next.getValue());
+
+      // verify we got the expected key
+      char expected = (char) (START_ROW + keyCount);
+      Assert.assertEquals("Unexpected row", Character.toString(expected), 
next.getKey().getRow().toString());
+
+      // determine whether we yielded on a next and seek
+      if ((keyCount & 1) != 0) {
+        yieldNextCount++;
+        yieldSeekCount++;
+      }
+      String[] value = StringUtils.split(next.getValue().toString(), ',');
+      Assert.assertEquals("Unexpected yield next count", 
Integer.toString(yieldNextCount), value[0]);
+      Assert.assertEquals("Unexpected yield seek count", 
Integer.toString(yieldSeekCount), value[1]);
+      Assert.assertEquals("Unexpected rebuild count", 
Integer.toString(yieldNextCount + yieldSeekCount), value[2]);
+
+      keyCount++;
+    }
+    Assert.assertEquals("Did not get the expected number of results", 10, 
keyCount);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/test/src/main/java/org/apache/accumulo/test/functional/YieldingIterator.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/YieldingIterator.java 
b/test/src/main/java/org/apache/accumulo/test/functional/YieldingIterator.java
new file mode 100644
index 0000000..4ac325d
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/YieldingIterator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Optional;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.iterators.YieldCallback;
+import org.apache.accumulo.core.iterators.YieldingKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This iterator which implements yielding will yield after every other next 
and every other seek call.
+ */
+public class YieldingIterator extends WrappingIterator implements 
YieldingKeyValueIterator<Key,Value> {
+  private static final Logger log = 
LoggerFactory.getLogger(YieldingIterator.class);
+  private static final AtomicInteger yieldNexts = new AtomicInteger(0);
+  private static final AtomicInteger yieldSeeks = new AtomicInteger(0);
+  private static final AtomicInteger rebuilds = new AtomicInteger(0);
+
+  private static final AtomicBoolean yieldNextKey = new AtomicBoolean(false);
+  private static final AtomicBoolean yieldSeekKey = new AtomicBoolean(false);
+
+  private Optional<YieldCallback<Key>> yield = Optional.absent();
+
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    YieldingIterator it = new YieldingIterator();
+    it.setSource(getSource().deepCopy(env));
+    return it;
+  }
+
+  @Override
+  public boolean hasTop() {
+    return (!(yield.isPresent() && yield.get().hasYielded()) && 
super.hasTop());
+  }
+
+  @Override
+  public void next() throws IOException {
+    log.info("start YieldingIterator.next: " + getTopValue());
+    boolean yielded = false;
+
+    // yield on every other next call.
+    yieldNextKey.set(!yieldNextKey.get());
+    if (yield.isPresent() && yieldNextKey.get()) {
+      yielded = true;
+      yieldNexts.incrementAndGet();
+      // since we are not actually skipping keys underneath, simply use the 
key following the top key as the yield key
+      
yield.get().yield(getTopKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME));
+      log.info("end YieldingIterator.next: yielded at " + getTopKey());
+    }
+
+    // if not yielding, then simply pass on the next call
+    if (!yielded) {
+      super.next();
+      log.info("end YieldingIterator.next: " + (hasTop() ? getTopKey() + " " + 
getTopValue() : "no top"));
+    }
+  }
+
+  /**
+   * The top value will encode the current state of the yields, seeks, and 
rebuilds for use by the YieldScannersIT tests.
+   *
+   * @return a top value of the form {yieldNexts},{yieldSeeks},{rebuilds}
+   */
+  @Override
+  public Value getTopValue() {
+    String value = Integer.toString(yieldNexts.get()) + ',' + 
Integer.toString(yieldSeeks.get()) + ',' + Integer.toString(rebuilds.get());
+    return new Value(value);
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive) throws IOException {
+    log.info("start YieldingIterator.seek: " + getTopValue() + " with range " 
+ range);
+    boolean yielded = false;
+
+    if (!range.isStartKeyInclusive()) {
+      rebuilds.incrementAndGet();
+
+      // yield on every other seek call.
+      yieldSeekKey.set(!yieldSeekKey.get());
+      if (yield.isPresent() && yieldSeekKey.get()) {
+        yielded = true;
+        yieldSeeks.incrementAndGet();
+        // since we are not actually skipping keys underneath, simply use the 
key following the range start key
+        
yield.get().yield(range.getStartKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME));
+        log.info("end YieldingIterator.next: yielded at " + 
range.getStartKey());
+      }
+    }
+
+    // if not yielding, then simply pass on the call to the source
+    if (!yielded) {
+      super.seek(range, columnFamilies, inclusive);
+      log.info("end YieldingIterator.seek: " + (hasTop() ? getTopKey() + " " + 
getTopValue() : "no top"));
+    }
+  }
+
+  @Override
+  public void enableYielding(YieldCallback<Key> yield) {
+    this.yield = Optional.of(yield);
+  }
+}

Reply via email to