Repository: accumulo Updated Branches: refs/heads/1.8 a5ed1ba3a -> 1ab2431c9
ACCUMULO-4643 Allow iterators to yield Implemented a mechanism to allow iterators to yield control allowing other scans to use the scan thread. Added requirement for hasTop to return false after yielding. closes #263 Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1ab2431c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1ab2431c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1ab2431c Branch: refs/heads/1.8 Commit: 1ab2431c96b64e90350ba54045a774a0f1e25404 Parents: a5ed1ba Author: Ivan Bella <i...@bella.name> Authored: Thu May 25 14:22:24 2017 -0400 Committer: Ivan Bella <i...@bella.name> Committed: Thu Jul 20 11:19:52 2017 -0400 ---------------------------------------------------------------------- .../core/iterators/SortedKeyValueIterator.java | 3 +- .../accumulo/core/iterators/YieldCallback.java | 57 +++++++ .../iterators/YieldingKeyValueIterator.java | 36 +++++ .../system/SourceSwitchingIterator.java | 49 +++++- .../system/SourceSwitchingIteratorTest.java | 112 +++++++++++++ .../testcases/YieldingTestCase.java | 87 ++++++++++ .../apache/accumulo/tserver/TabletServer.java | 4 + .../Metrics2TabletServerScanMetrics.java | 5 +- .../metrics/TabletServerScanMetricsKeys.java | 1 + .../apache/accumulo/tserver/tablet/Scanner.java | 2 +- .../apache/accumulo/tserver/tablet/Tablet.java | 77 +++++++-- .../apache/accumulo/test/YieldScannersIT.java | 161 +++++++++++++++++++ .../test/functional/YieldingIterator.java | 129 +++++++++++++++ 13 files changed, 702 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java index ce5ef24..38158ff 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java @@ -50,7 +50,8 @@ public interface SortedKeyValueIterator<K extends WritableComparable<?>,V extend void init(SortedKeyValueIterator<K,V> source, Map<String,String> options, IteratorEnvironment env) throws IOException; /** - * Returns true if the iterator has more elements. + * Returns true if the iterator has more elements. Note that if this iterator has yielded (@see enableYielding(YieldCallback)), this this method must return + * false. * * @return <tt>true</tt> if the iterator has more elements. * @exception IllegalStateException http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/core/src/main/java/org/apache/accumulo/core/iterators/YieldCallback.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/YieldCallback.java b/core/src/main/java/org/apache/accumulo/core/iterators/YieldCallback.java new file mode 100644 index 0000000..3d151cf --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/iterators/YieldCallback.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.iterators; + +/** + * This callback handles the state of yielding within an iterator + */ +public class YieldCallback<K> { + private K key; + + /** + * Called by the iterator when a next or seek call yields control. + * + * @param key + * the key position at which the iterator yielded. + */ + public void yield(K key) { + this.key = key; + } + + /** + * Called by the client to see if the iterator yielded + * + * @return true if iterator yielded control + */ + public boolean hasYielded() { + return (this.key != null); + } + + /** + * Called by the client to get the yield position used as the start key (non-inclusive) of the range in a subsequent seek call when the iterator is rebuilt. + * This will also reset the state returned by hasYielded. + * + * @return <tt>K</tt> The key position + */ + public K getPositionAndReset() { + try { + return this.key; + } finally { + this.key = null; + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/core/src/main/java/org/apache/accumulo/core/iterators/YieldingKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/YieldingKeyValueIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/YieldingKeyValueIterator.java new file mode 100644 index 0000000..76f8f31 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/iterators/YieldingKeyValueIterator.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.iterators; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * An iterator that supports iterating over key and value pairs, and supports yielding on a next or seek call. + */ +public interface YieldingKeyValueIterator<K extends WritableComparable<?>,V extends Writable> extends SortedKeyValueIterator<K,V> { + + /** + * Allows implementations to preempt further iteration of this iterator in the current RPC. Implementations can use the yield method on the callback to + * instruct the caller to cease collecting more results within this RPC. An implementation would only need to implement this mechanism if a next or seek call + * has been taking so long as to starve out other scans within the same thread pool. Most iterators do not need to implement this method. The yield method on + * the callback accepts a Key which will be used as the start key (non-inclusive) on the seek call in the next RPC. This feature is not supported for isolated + * scans. + */ + void enableYielding(YieldCallback<K> callback); + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java index 098aa63..9cb3bd7 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Optional; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; @@ -30,13 +31,15 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.YieldCallback; +import org.apache.accumulo.core.iterators.YieldingKeyValueIterator; /** * A SortedKeyValueIterator which presents a view over some section of data, regardless of whether or not it is backed by memory (InMemoryMap) or an RFile * (InMemoryMap that was minor compacted to a file). Clients reading from a table that has data in memory should not see interruption in their scan when that * data is minor compacted. This iterator is designed to manage this behind the scene. */ -public class SourceSwitchingIterator implements InterruptibleIterator { +public class SourceSwitchingIterator implements InterruptibleIterator, YieldingKeyValueIterator<Key,Value> { public interface DataSource { boolean isCurrent(); @@ -53,6 +56,8 @@ public class SourceSwitchingIterator implements InterruptibleIterator { private DataSource source; private SortedKeyValueIterator<Key,Value> iter; + private Optional<YieldCallback<Key>> yield = Optional.absent(); + private Key key; private Value val; @@ -113,6 +118,18 @@ public class SourceSwitchingIterator implements InterruptibleIterator { } @Override + public void enableYielding(YieldCallback<Key> yield) { + this.yield = Optional.of(yield); + + // if we require row isolation, then we cannot support yielding in the middle. + if (!onlySwitchAfterRow) { + if (iter != null && iter instanceof YieldingKeyValueIterator) { + ((YieldingKeyValueIterator<Key,Value>) iter).enableYielding(yield); + } + } + } + + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { throw new UnsupportedOperationException(); } @@ -126,14 +143,23 @@ public class SourceSwitchingIterator implements InterruptibleIterator { private void readNext(boolean initialSeek) throws IOException { + // we need to check here if we were yielded in case the source was switched out and re-seeked by someone else (minor compaction/InMemoryMap) + boolean yielded = (yield.isPresent() && yield.get().hasYielded()); + // check of initialSeek second is intentional so that it does not short // circuit the call to switchSource - boolean seekNeeded = (!onlySwitchAfterRow && switchSource()) || initialSeek; + boolean seekNeeded = yielded || (!onlySwitchAfterRow && switchSource()) || initialSeek; if (seekNeeded) if (initialSeek) iter.seek(range, columnFamilies, inclusive); - else + else if (yielded) { + Key yieldPosition = yield.get().getPositionAndReset(); + if (!range.contains(yieldPosition)) { + throw new IOException("Underlying iterator yielded to a position outside of its range: " + yieldPosition + " not in " + range); + } + iter.seek(new Range(yieldPosition, false, range.getEndKey(), range.isEndKeyInclusive()), columnFamilies, inclusive); + } else iter.seek(new Range(key, false, range.getEndKey(), range.isEndKeyInclusive()), columnFamilies, inclusive); else { iter.next(); @@ -144,6 +170,10 @@ public class SourceSwitchingIterator implements InterruptibleIterator { } if (iter.hasTop()) { + if (yield.isPresent() && yield.get().hasYielded()) { + throw new IOException("Coding error: hasTop returned true but has yielded at " + yield.get().getPositionAndReset()); + } + Key nextKey = iter.getTopKey(); Value nextVal = iter.getTopValue(); @@ -163,6 +193,11 @@ public class SourceSwitchingIterator implements InterruptibleIterator { if (!source.isCurrent()) { source = source.getNewDataSource(); iter = source.iterator(); + if (!onlySwitchAfterRow && yield.isPresent()) { + if (iter instanceof YieldingKeyValueIterator) { + ((YieldingKeyValueIterator<Key,Value>) iter).enableYielding(yield.get()); + } + } return true; } @@ -176,8 +211,14 @@ public class SourceSwitchingIterator implements InterruptibleIterator { this.inclusive = inclusive; this.columnFamilies = columnFamilies; - if (iter == null) + if (iter == null) { iter = source.iterator(); + if (!onlySwitchAfterRow && yield.isPresent()) { + if (iter instanceof YieldingKeyValueIterator) { + ((YieldingKeyValueIterator<Key,Value>) iter).enableYielding(yield.get()); + } + } + } readNext(true); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/core/src/test/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIteratorTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIteratorTest.java index e6ca6d3..037ee7e 100644 --- a/core/src/test/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIteratorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIteratorTest.java @@ -16,21 +16,28 @@ */ package org.apache.accumulo.core.iterators.system; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Optional; import junit.framework.TestCase; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IterationInterruptedException; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.SortedMapIterator; +import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.iterators.YieldCallback; +import org.apache.accumulo.core.iterators.YieldingKeyValueIterator; import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource; import org.apache.hadoop.io.Text; @@ -273,4 +280,109 @@ public class SourceSwitchingIteratorTest extends TestCase { } catch (IterationInterruptedException iie) {} } + + private Range yield(Range r, SourceSwitchingIterator ssi, YieldCallback<Key> yield) throws IOException { + while (yield.hasYielded()) { + Key yieldPosition = yield.getPositionAndReset(); + if (!r.contains(yieldPosition)) { + throw new IOException("Underlying iterator yielded to a position outside of its range: " + yieldPosition + " not in " + r); + } + r = new Range(yieldPosition, false, (Key) null, r.isEndKeyInclusive()); + ssi.seek(r, new ArrayList<ByteSequence>(), false); + } + return r; + } + + public void testYield() throws Exception { + TreeMap<Key,Value> tm1 = new TreeMap<>(); + put(tm1, "r1", "cf1", "cq1", 5, "v1"); + put(tm1, "r1", "cf1", "cq3", 5, "v2"); + put(tm1, "r2", "cf1", "cq1", 5, "v3"); + + SortedMapIterator smi = new SortedMapIterator(tm1); + YieldingIterator ymi = new YieldingIterator(smi); + TestDataSource tds = new TestDataSource(ymi); + SourceSwitchingIterator ssi = new SourceSwitchingIterator(tds); + + YieldCallback<Key> yield = new YieldCallback<>(); + ssi.enableYielding(yield); + + Range r = new Range(); + ssi.seek(r, new ArrayList<ByteSequence>(), false); + r = yield(r, ssi, yield); + testAndCallNext(ssi, "r1", "cf1", "cq1", 5, "v1", true); + r = yield(r, ssi, yield); + testAndCallNext(ssi, "r1", "cf1", "cq3", 5, "v2", true); + r = yield(r, ssi, yield); + testAndCallNext(ssi, "r2", "cf1", "cq1", 5, "v3", true); + r = yield(r, ssi, yield); + assertFalse(ssi.hasTop()); + } + + /** + * This iterator which implements yielding will yield after every other next and every other seek call. + */ + private final AtomicBoolean yieldNextKey = new AtomicBoolean(false); + private final AtomicBoolean yieldSeekKey = new AtomicBoolean(false); + + public class YieldingIterator extends WrappingIterator implements YieldingKeyValueIterator<Key,Value> { + private Optional<YieldCallback<Key>> yield = Optional.absent(); + + public YieldingIterator(SortedKeyValueIterator<Key,Value> source) { + setSource(source); + } + + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + return new YieldingIterator(getSource().deepCopy(env)); + } + + @Override + public boolean hasTop() { + return (!(yield.isPresent() && yield.get().hasYielded()) && super.hasTop()); + } + + @Override + public void next() throws IOException { + boolean yielded = false; + + // yield on every other next call. + yieldNextKey.set(!yieldNextKey.get()); + if (yield.isPresent() && yieldNextKey.get()) { + yielded = true; + // since we are not actually skipping keys underneath, simply use the key following the top key as the yield key + yield.get().yield(getTopKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME)); + } + + // if not yielding, then simply pass on the next call + if (!yielded) { + super.next(); + } + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + boolean yielded = false; + + if (!range.isStartKeyInclusive()) { + // yield on every other seek call. + yieldSeekKey.set(!yieldSeekKey.get()); + if (yield.isPresent() && yieldSeekKey.get()) { + yielded = true; + // since we are not actually skipping keys underneath, simply use the key following the range start key + yield.get().yield(range.getStartKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME)); + } + } + + // if not yielding, then simply pass on the call to the source + if (!yielded) { + super.seek(range, columnFamilies, inclusive); + } + } + + @Override + public void enableYielding(YieldCallback<Key> yield) { + this.yield = Optional.of(yield); + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/iterator-test-harness/src/main/java/org/apache/accumulo/iteratortest/testcases/YieldingTestCase.java ---------------------------------------------------------------------- diff --git a/iterator-test-harness/src/main/java/org/apache/accumulo/iteratortest/testcases/YieldingTestCase.java b/iterator-test-harness/src/main/java/org/apache/accumulo/iteratortest/testcases/YieldingTestCase.java new file mode 100644 index 0000000..f9de207 --- /dev/null +++ b/iterator-test-harness/src/main/java/org/apache/accumulo/iteratortest/testcases/YieldingTestCase.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.iteratortest.testcases; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.YieldCallback; +import org.apache.accumulo.core.iterators.YieldingKeyValueIterator; +import org.apache.accumulo.iteratortest.IteratorTestInput; +import org.apache.accumulo.iteratortest.IteratorTestOutput; +import org.apache.accumulo.iteratortest.IteratorTestUtil; +import org.apache.accumulo.iteratortest.environments.SimpleIteratorEnvironment; + +import java.io.IOException; +import java.util.TreeMap; + +/** + * Test case that verifies that an iterator works correctly with the yielding api. Note that most iterators do nothing in terms of yielding in which case this + * merely tests that the iterator produces the correct output. If however the iterator does override the yielding api, then this ensures that it works correctly + * iff the iterator actually decides to yield. Nothing can force an iterator to yield without knowing something about the internals of the iterator being + * tested. + */ +public class YieldingTestCase extends OutputVerifyingTestCase { + + @Override + public IteratorTestOutput test(IteratorTestInput testInput) { + final SortedKeyValueIterator<Key,Value> skvi = IteratorTestUtil.instantiateIterator(testInput); + final SortedKeyValueIterator<Key,Value> source = IteratorTestUtil.createSource(testInput); + + try { + skvi.init(source, testInput.getIteratorOptions(), new SimpleIteratorEnvironment()); + + YieldCallback<Key> yield = new YieldCallback<>(); + if (skvi instanceof YieldingKeyValueIterator) { + ((YieldingKeyValueIterator<Key,Value>) skvi).enableYielding(yield); + } + + skvi.seek(testInput.getRange(), testInput.getFamilies(), testInput.isInclusive()); + return new IteratorTestOutput(consume(testInput, skvi, yield)); + } catch (IOException e) { + return new IteratorTestOutput(e); + } + } + + TreeMap<Key,Value> consume(IteratorTestInput testInput, SortedKeyValueIterator<Key,Value> skvi, YieldCallback<Key> yield) throws IOException { + TreeMap<Key,Value> data = new TreeMap<>(); + Key lastKey = null; + while (yield.hasYielded() || skvi.hasTop()) { + if (yield.hasYielded()) { + Range r = testInput.getRange(); + Key yieldPosition = yield.getPositionAndReset(); + if (!r.contains(yieldPosition)) { + throw new IOException("Underlying iterator yielded to a position outside of its range: " + yieldPosition + " not in " + r); + } + if (skvi.hasTop()) { + throw new IOException("Underlying iterator reports having a top, but has yielded: " + yieldPosition); + } + if (lastKey != null && yieldPosition.compareTo(lastKey) <= 0) { + throw new IOException("Underlying iterator yielded at a position that is not past the last key returned"); + } + skvi.seek(new Range(yieldPosition, false, r.getEndKey(), r.isEndKeyInclusive()), testInput.getFamilies(), testInput.isInclusive()); + } else { + // Make sure to copy the K-V + data.put(new Key(skvi.getTopKey()), new Value(skvi.getTopValue())); + skvi.next(); + } + } + return data; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index a937235..6ead6b8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -282,6 +282,10 @@ public class TabletServer extends AccumuloServerContext implements Runnable { private final Metrics scanMetrics; private final Metrics mincMetrics; + public Metrics getScanMetrics() { + return scanMetrics; + } + public Metrics getMinCMetrics() { return mincMetrics; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerScanMetrics.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerScanMetrics.java index 0c72cb5..d2fc871 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerScanMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerScanMetrics.java @@ -33,7 +33,7 @@ public class Metrics2TabletServerScanMetrics implements Metrics, MetricsSource, private final MetricsSystem system; private final MetricsRegistry registry; - private final MutableStat scans, resultsPerScan; + private final MutableStat scans, resultsPerScan, yields; // Use TabletServerMetricsFactory Metrics2TabletServerScanMetrics(MetricsSystem system) { @@ -42,6 +42,7 @@ public class Metrics2TabletServerScanMetrics implements Metrics, MetricsSource, scans = registry.newStat(SCAN, "Scans", "Ops", "Count", true); resultsPerScan = registry.newStat(RESULT_SIZE, "Results per scan", "Ops", "Count", true); + yields = registry.newStat(YIELD, "Yields", "Ops", "Count", true); } @Override @@ -50,6 +51,8 @@ public class Metrics2TabletServerScanMetrics implements Metrics, MetricsSource, scans.add(value); } else if (RESULT_SIZE.equals(name)) { resultsPerScan.add(value); + } else if (YIELD.equals(name)) { + yields.add(value); } else { throw new RuntimeException("Could not find metric to update for name " + name); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetricsKeys.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetricsKeys.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetricsKeys.java index 7d33a84..8c9ab34 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetricsKeys.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetricsKeys.java @@ -23,5 +23,6 @@ public interface TabletServerScanMetricsKeys { String SCAN = "scan"; String RESULT_SIZE = "result"; + String YIELD = "yield"; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java index 15526d7..bf3ba86 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java @@ -95,7 +95,7 @@ public class Scanner { iter = new SourceSwitchingIterator(dataSource, false); } - results = tablet.nextBatch(iter, range, options.getNum(), options.getColumnSet(), options.getBatchTimeOut()); + results = tablet.nextBatch(iter, range, options.getNum(), options.getColumnSet(), options.getBatchTimeOut(), options.isIsolated()); if (results.getResults() == null) { range = null; http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 6637521..9278a36 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -75,6 +75,8 @@ import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.iterators.IterationInterruptedException; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.YieldCallback; +import org.apache.accumulo.core.iterators.YieldingKeyValueIterator; import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator; import org.apache.accumulo.core.master.thrift.BulkImportState; import org.apache.accumulo.core.master.thrift.TabletLoadState; @@ -138,6 +140,7 @@ import org.apache.accumulo.tserver.log.DfsLogger; import org.apache.accumulo.tserver.log.MutationReceiver; import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage; import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics; +import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; import org.apache.accumulo.tserver.tablet.Compactor.CompactionCanceledException; import org.apache.accumulo.tserver.tablet.Compactor.CompactionEnv; import org.apache.commons.codec.DecoderException; @@ -546,11 +549,17 @@ public class Tablet implements TabletCommitter { batchTimeOut = 0; } + // determine if the iterator supported yielding + YieldCallback<Key> yield = new YieldCallback<>(); + if (mmfi instanceof YieldingKeyValueIterator) + ((YieldingKeyValueIterator<Key,Value>) mmfi).enableYielding(yield); + boolean yielded = false; + for (Range range : ranges) { boolean timesUp = batchTimeOut > 0 && System.nanoTime() > returnTime; - if (exceededMemoryUsage || tabletClosed || timesUp) { + if (exceededMemoryUsage || tabletClosed || timesUp || yielded) { lookupResult.unfinishedRanges.add(range); continue; } @@ -564,6 +573,9 @@ public class Tablet implements TabletCommitter { mmfi.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false); while (mmfi.hasTop()) { + if (yield.hasYielded()) { + throw new IOException("Coding error: hasTop returned true but has yielded at " + yield.getPositionAndReset()); + } Key key = mmfi.getTopKey(); KVEntry kve = new KVEntry(key, mmfi.getTopValue()); @@ -584,6 +596,23 @@ public class Tablet implements TabletCommitter { mmfi.next(); } + if (yield.hasYielded()) { + yielded = true; + Key yieldPosition = yield.getPositionAndReset(); + if (!range.contains(yieldPosition)) { + throw new IOException("Underlying iterator yielded to a position outside of its range: " + yieldPosition + " not in " + range); + } + if (!results.isEmpty() && yieldPosition.compareTo(results.get(results.size() - 1).getKey()) <= 0) { + throw new IOException("Underlying iterator yielded to a position that does not follow the last key returned: " + yieldPosition + " <= " + + results.get(results.size() - 1).getKey()); + } + addUnfinishedRange(lookupResult, range, yieldPosition, false); + + log.debug("Scan yield detected at position " + yieldPosition); + Metrics scanMetrics = getTabletServer().getScanMetrics(); + if (scanMetrics.isEnabled()) + scanMetrics.add(TabletServerScanMetrics.YIELD, 1); + } } catch (TooManyFilesException tmfe) { // treat this as a closed tablet, and let the client retry log.warn("Tablet " + getExtent() + " has too many files, batch lookup can not run"); @@ -696,7 +725,7 @@ public class Tablet implements TabletCommitter { } } - Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, Set<Column> columns, long batchTimeOut) throws IOException { + Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, Set<Column> columns, long batchTimeOut, boolean isolated) throws IOException { // log.info("In nextBatch.."); @@ -713,18 +742,27 @@ public class Tablet implements TabletCommitter { long maxResultsSize = tableConfiguration.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM); + Key continueKey = null; + boolean skipContinueKey = false; + + YieldCallback<Key> yield = new YieldCallback<>(); + + // we cannot yield if we are in isolation mode + if (!isolated) { + if (iter instanceof YieldingKeyValueIterator) + ((YieldingKeyValueIterator<Key,Value>) iter).enableYielding(yield); + } + if (columns.size() == 0) { iter.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false); } else { iter.seek(range, LocalityGroupUtil.families(columns), true); } - Key continueKey = null; - boolean skipContinueKey = false; - - boolean endOfTabletReached = false; while (iter.hasTop()) { - + if (yield.hasYielded()) { + throw new IOException("Coding error: hasTop returned true but has yielded at " + yield.getPositionAndReset()); + } value = iter.getTopValue(); key = iter.getTopKey(); @@ -744,17 +782,28 @@ public class Tablet implements TabletCommitter { iter.next(); } - if (iter.hasTop() == false) { - endOfTabletReached = true; - } + if (yield.hasYielded()) { + continueKey = new Key(yield.getPositionAndReset()); + skipContinueKey = true; + if (!range.contains(continueKey)) { + throw new IOException("Underlying iterator yielded to a position outside of its range: " + continueKey + " not in " + range); + } + if (!results.isEmpty() && continueKey.compareTo(results.get(results.size() - 1).getKey()) <= 0) { + throw new IOException("Underlying iterator yielded to a position that does not follow the last key returned: " + continueKey + " <= " + + results.get(results.size() - 1).getKey()); + } - if (endOfTabletReached) { + log.debug("Scan yield detected at position " + continueKey); + Metrics scanMetrics = getTabletServer().getScanMetrics(); + if (scanMetrics.isEnabled()) + scanMetrics.add(TabletServerScanMetrics.YIELD, 1); + } else if (iter.hasTop() == false) { + // end of tablet has been reached continueKey = null; + if (results.size() == 0) + results = null; } - if (endOfTabletReached && results.size() == 0) - results = null; - return new Batch(skipContinueKey, results, continueKey, resultBytes); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java b/test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java new file mode 100644 index 0000000..4d86dd3 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.YieldingIterator; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// ACCUMULO-4643 +public class YieldScannersIT extends AccumuloClusterHarness { + Logger log = LoggerFactory.getLogger(YieldScannersIT.class); + private static final char START_ROW = 'a'; + + @Override + public int defaultTimeoutSeconds() { + return 60; + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + } + + @Test + public void testScan() throws Exception { + // make a table + final String tableName = getUniqueNames(1)[0]; + final Connector conn = getConnector(); + conn.tableOperations().create(tableName); + final BatchWriter writer = conn.createBatchWriter(tableName, new BatchWriterConfig()); + for (int i = 0; i < 10; i++) { + byte[] row = new byte[] {(byte) (START_ROW + i)}; + Mutation m = new Mutation(new Text(row)); + m.put(new Text(), new Text(), new Value()); + writer.addMutation(m); + } + writer.flush(); + writer.close(); + + log.info("Creating scanner"); + // make a scanner for a table with 10 keys + final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY); + final IteratorSetting cfg = new IteratorSetting(100, YieldingIterator.class); + scanner.addScanIterator(cfg); + + log.info("iterating"); + Iterator<Map.Entry<Key,Value>> it = scanner.iterator(); + int keyCount = 0; + int yieldNextCount = 0; + int yieldSeekCount = 0; + while (it.hasNext()) { + Map.Entry<Key,Value> next = it.next(); + log.info(Integer.toString(keyCount) + ": Got key " + next.getKey() + " with value " + next.getValue()); + + // verify we got the expected key + char expected = (char) (START_ROW + keyCount); + Assert.assertEquals("Unexpected row", Character.toString(expected), next.getKey().getRow().toString()); + + // determine whether we yielded on a next and seek + if ((keyCount & 1) != 0) { + yieldNextCount++; + yieldSeekCount++; + } + String[] value = StringUtils.split(next.getValue().toString(), ','); + Assert.assertEquals("Unexpected yield next count", Integer.toString(yieldNextCount), value[0]); + Assert.assertEquals("Unexpected yield seek count", Integer.toString(yieldSeekCount), value[1]); + Assert.assertEquals("Unexpected rebuild count", Integer.toString(yieldNextCount + yieldSeekCount), value[2]); + + keyCount++; + } + Assert.assertEquals("Did not get the expected number of results", 10, keyCount); + } + + @Test + public void testBatchScan() throws Exception { + // make a table + final String tableName = getUniqueNames(1)[0]; + final Connector conn = getConnector(); + conn.tableOperations().create(tableName); + final BatchWriter writer = conn.createBatchWriter(tableName, new BatchWriterConfig()); + for (int i = 0; i < 10; i++) { + byte[] row = new byte[] {(byte) (START_ROW + i)}; + Mutation m = new Mutation(new Text(row)); + m.put(new Text(), new Text(), new Value()); + writer.addMutation(m); + } + writer.flush(); + writer.close(); + + log.info("Creating batch scanner"); + // make a scanner for a table with 10 keys + final BatchScanner scanner = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1); + final IteratorSetting cfg = new IteratorSetting(100, YieldingIterator.class); + scanner.addScanIterator(cfg); + scanner.setRanges(Collections.singleton(new Range())); + + log.info("iterating"); + Iterator<Map.Entry<Key,Value>> it = scanner.iterator(); + int keyCount = 0; + int yieldNextCount = 0; + int yieldSeekCount = 0; + while (it.hasNext()) { + Map.Entry<Key,Value> next = it.next(); + log.info(Integer.toString(keyCount) + ": Got key " + next.getKey() + " with value " + next.getValue()); + + // verify we got the expected key + char expected = (char) (START_ROW + keyCount); + Assert.assertEquals("Unexpected row", Character.toString(expected), next.getKey().getRow().toString()); + + // determine whether we yielded on a next and seek + if ((keyCount & 1) != 0) { + yieldNextCount++; + yieldSeekCount++; + } + String[] value = StringUtils.split(next.getValue().toString(), ','); + Assert.assertEquals("Unexpected yield next count", Integer.toString(yieldNextCount), value[0]); + Assert.assertEquals("Unexpected yield seek count", Integer.toString(yieldSeekCount), value[1]); + Assert.assertEquals("Unexpected rebuild count", Integer.toString(yieldNextCount + yieldSeekCount), value[2]); + + keyCount++; + } + Assert.assertEquals("Did not get the expected number of results", 10, keyCount); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ab2431c/test/src/main/java/org/apache/accumulo/test/functional/YieldingIterator.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/YieldingIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/YieldingIterator.java new file mode 100644 index 0000000..4ac325d --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/YieldingIterator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.base.Optional; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.WrappingIterator; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.core.iterators.YieldCallback; +import org.apache.accumulo.core.iterators.YieldingKeyValueIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This iterator which implements yielding will yield after every other next and every other seek call. + */ +public class YieldingIterator extends WrappingIterator implements YieldingKeyValueIterator<Key,Value> { + private static final Logger log = LoggerFactory.getLogger(YieldingIterator.class); + private static final AtomicInteger yieldNexts = new AtomicInteger(0); + private static final AtomicInteger yieldSeeks = new AtomicInteger(0); + private static final AtomicInteger rebuilds = new AtomicInteger(0); + + private static final AtomicBoolean yieldNextKey = new AtomicBoolean(false); + private static final AtomicBoolean yieldSeekKey = new AtomicBoolean(false); + + private Optional<YieldCallback<Key>> yield = Optional.absent(); + + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + YieldingIterator it = new YieldingIterator(); + it.setSource(getSource().deepCopy(env)); + return it; + } + + @Override + public boolean hasTop() { + return (!(yield.isPresent() && yield.get().hasYielded()) && super.hasTop()); + } + + @Override + public void next() throws IOException { + log.info("start YieldingIterator.next: " + getTopValue()); + boolean yielded = false; + + // yield on every other next call. + yieldNextKey.set(!yieldNextKey.get()); + if (yield.isPresent() && yieldNextKey.get()) { + yielded = true; + yieldNexts.incrementAndGet(); + // since we are not actually skipping keys underneath, simply use the key following the top key as the yield key + yield.get().yield(getTopKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME)); + log.info("end YieldingIterator.next: yielded at " + getTopKey()); + } + + // if not yielding, then simply pass on the next call + if (!yielded) { + super.next(); + log.info("end YieldingIterator.next: " + (hasTop() ? getTopKey() + " " + getTopValue() : "no top")); + } + } + + /** + * The top value will encode the current state of the yields, seeks, and rebuilds for use by the YieldScannersIT tests. + * + * @return a top value of the form {yieldNexts},{yieldSeeks},{rebuilds} + */ + @Override + public Value getTopValue() { + String value = Integer.toString(yieldNexts.get()) + ',' + Integer.toString(yieldSeeks.get()) + ',' + Integer.toString(rebuilds.get()); + return new Value(value); + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + log.info("start YieldingIterator.seek: " + getTopValue() + " with range " + range); + boolean yielded = false; + + if (!range.isStartKeyInclusive()) { + rebuilds.incrementAndGet(); + + // yield on every other seek call. + yieldSeekKey.set(!yieldSeekKey.get()); + if (yield.isPresent() && yieldSeekKey.get()) { + yielded = true; + yieldSeeks.incrementAndGet(); + // since we are not actually skipping keys underneath, simply use the key following the range start key + yield.get().yield(range.getStartKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME)); + log.info("end YieldingIterator.next: yielded at " + range.getStartKey()); + } + } + + // if not yielding, then simply pass on the call to the source + if (!yielded) { + super.seek(range, columnFamilies, inclusive); + log.info("end YieldingIterator.seek: " + (hasTop() ? getTopKey() + " " + getTopValue() : "no top")); + } + } + + @Override + public void enableYielding(YieldCallback<Key> yield) { + this.yield = Optional.of(yield); + } +}