Hello,
I'm trying to write an iterator that gets the top N sorted entries for a
given range over sharded data. I created a custom iterator that extends
SkippingIterator and made it so that it will return the first N entries for
each tablet. After N entries, I have the source iterator seek to the end
key of the specific range since it shouldn't return any other entries for
that tablet.
@Override
public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options, IteratorEnvironment env) throws IOException {
super.init(source, options, env);
String o = options.get(NUM_SCANS_STRING_NAME);
numScans = o == null ? 10 : Integer.parseInt(o);
String n = options.get(NUM_ENTRIES_STRING_NAME);
numEntriesPerRange = n == null ? Integer.MAX_VALUE :
Integer.parseInt(n);
numEntries = 0;
}
// this is only ever called immediately after getting "next" entry
@Override
protected void consume() throws IOException {
if (numEntries < numEntriesPerRange) {
++numEntries;
return;
}
int count = 0;
while (getSource().hasTop()) {
if (count < numScans) {
++count;
getSource().next(); // scan
} else {
// too many scans, just seek to end of range
Key lastKey = latestRange.getEndKey() == null ? new Key(new
Text(String.valueOf(Character.MAX_VALUE))) :
latestRange.getEndKey().followingKey(PartialKey.ROW);
getSource().seek(new Range(lastKey, true, lastKey,
false), latestColumnFamilies, latestInclusive);
}
}
}
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive) throws IOException {
// save parameters for future internal seeks
latestRange = range;
latestColumnFamilies = columnFamilies;
latestInclusive = inclusive;
super.seek(range, columnFamilies, inclusive);
if (getSource().hasTop()) {
if (range.beforeStartKey(getSource().getTopKey()))
consume();
}
}
I did some initial testing and it seems to work as expected, bringing back
N * number of tablets results. However, when I increase the limit past a
certain point something seems to be messing up and I get all entries back
instead of the limited count. I also sometimes see this error but I looked
online and I'm not sure if it's related:
16/08/12 20:54:22 WARN transport.TIOStreamTransport: Error closing output
stream.
java.io.IOException: The stream is closed
at
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:118)
at
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
at
org.apache.thrift.transport.TIOStreamTransport.close(TIOStreamTransport.java:110)
at
org.apache.thrift.transport.TFramedTransport.close(TFramedTransport.java:89)
at
org.apache.accumulo.core.client.impl.ThriftTransportPool$CachedTTransport.close(ThriftTransportPool.java:312)
at
org.apache.accumulo.core.client.impl.ThriftTransportPool.returnTransport(ThriftTransportPool.java:584)
at
org.apache.accumulo.core.util.ThriftUtil.returnClient(ThriftUtil.java:134)
at
org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator.doLookup(TabletServerBatchReaderIterator.java:714)
at
org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator$QueryTask.run(TabletServerBatchReaderIterator.java:376)
at
org.apache.accumulo.trace.instrument.TraceRunnable.run(TraceRunnable.java:47)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at
org.apache.accumulo.trace.instrument.TraceRunnable.run(TraceRunnable.java:47)
at
org.apache.accumulo.core.util.LoggingRunnable.run(LoggingRunnable.java:34)
at java.lang.Thread.run(Thread.java:745)
Does anyone have any idea why the iterator would work for lower values of N
but not higher ones? Also, I don’t have a lot of experience with iterators
and am not confident that the seek in consume() is right. What is the best
way to skip the rest of a range in an iterator? Or is this not feasible?
Any help would be greatly appreciated!
Thanks,
Ryan