I would suggest that your approach is flawed from the start. Consider the following case:

You read through the first half of a tablet and have collected a set of 1000 IDs which you have seen. When you try to read the second half of the tablet, the TabletServer dies from an OOME. The Tablet is moved to a different TabletServer, starts reading the second half of the Tablet, but cannot know any of those 1000 IDs that you had collected in memory on the other TabletServer.

Iterators are *not* designed to be stateful. Pretty much any attempt you do to try to force them to be stateful will have some sort of inherent flaw. If you need to maintain state, you have two options:

1. Do it outside of Accumulo -- in the client or some other execution framework (e.g. YARN, Spark, Fluo, etc). There are many options, which one you should use likely depends on your application. 2. Create a table schema in which all of the elements you need to read/act on exist in one row. A row is the finest level of atomicity that Accumulo provides. This depends a bit on what the actual problem is.

Roshan Punnoose wrote:
I have a tablet with an unsorted list of IDs in the Column Qualifier,
these IDs can repeat sporadically. So I was hoping to keep a set of
these IDs around in memory to check if I have seen an ID or not. There
is some other logic to ensure that the set does not grow unbounded, but
just trying to figure out if I can keep this ID set around. With the
teardown, even though I know which was the last Key to return from the
new seek Range, I don't know if I have seen the upcoming IDs. Not sure
if that makes sense...

Was thinking that on teardown, we could use either the deepCopy or init
method to rollover state from the torn down iterator to the new iterator.

On Wed, Jan 4, 2017 at 11:14 AM Keith Turner <[email protected]
<mailto:[email protected]>> wrote:

    On Wed, Jan 4, 2017 at 10:44 AM, Roshan Punnoose <[email protected]
    <mailto:[email protected]>> wrote:
     > Keith,
     >
     > If an iterator has state that it is maintaining, what is the best
    way to
     > transfer that state to the new iterator after a tear down?  For
    example,
     > MyIterator might have a Boolean flag of some sort. After tear
    down, is there
     > a way to copy that state to the new iterator before it starts
    seeking again?

    There is nothing currently built in to help with this.

    What are you trying to accomplish?  Are you interested in maintaining
    this state for a scan or batch scan?


     >
     > Roshan
     >
     > On Wed, Jan 4, 2017 at 10:33 AM Keith Turner <[email protected]
    <mailto:[email protected]>> wrote:
     >>
     >> Josh,
     >>
     >> Deepcopy is not called when an iterator is torn down.  It has an
     >> entirely different use. Deepcopy allows cloning of an iterator
    during
     >> init().  The clones allow you to have multiple pointers into a
    tablets
     >> data which allows things like server side joins.
     >>
     >> Keith
     >>
     >> On Wed, Dec 28, 2016 at 12:50 PM, Josh Clum <[email protected]
    <mailto:[email protected]>> wrote:
     >> > Hi,
     >> >
     >> > I have a question about iterator teardown. It seems from
     >> >
     >> >
    
https://github.com/apache/accumulo/blob/master/docs/src/main/asciidoc/chapters/iterator_design.txt#L383-L390
     >> > that deepCopy should be called when an iterator is torn down.
    I'm not
     >> > seeing
     >> > that behavior. Below is a test that sets table.scan.max.memory
    to 1
     >> > which
     >> > should force a tear down for each kv returned. I should see
    deepCopy
     >> > being
     >> > called 3 times but when I tail the Tserver logs I'm not seeing
    it being
     >> > called. Below is the test and the Tserver output.
     >> >
     >> > What am I missing here?
     >> >
     >> > Josh
     >> >
     >> > ➜  tail -f -n200 ...../accumulo/logs/TabletServer_*.out | grep
     >> > MyIterator
     >> > MyIterator: init
     >> > MyIterator: seek
     >> > MyIterator: hasTop
     >> > MyIterator: getTopKey
     >> > MyIterator: getTopValue
     >> > MyIterator: init
     >> > MyIterator: seek
     >> > MyIterator: hasTop
     >> > MyIterator: getTopKey
     >> > MyIterator: getTopValue
     >> > MyIterator: init
     >> > MyIterator: seek
     >> > MyIterator: hasTop
     >> > MyIterator: getTopKey
     >> > MyIterator: getTopValue
     >> > MyIterator: init
     >> > MyIterator: seek
     >> > MyIterator: hasTop
     >> >
     >> > public static class MyIterator implements
    SortedKeyValueIterator<Key,
     >> > Value>
     >> > {
     >> >
     >> >     private SortedKeyValueIterator<Key, Value> source;
     >> >
     >> >     public MyIterator() { }
     >> >
     >> >     @Override
     >> >     public void init(SortedKeyValueIterator<Key, Value> source,
     >> >                      Map<String, String> options,
     >> >                      IteratorEnvironment env) throws IOException {
     >> >         System.out.println("MyIterator: init");
     >> >         this.source = source;
     >> >     }
     >> >
     >> >     @Override
     >> >     public boolean hasTop() {
     >> >         System.out.println("MyIterator: hasTop");
     >> >         return source.hasTop();
     >> >     }
     >> >
     >> >     @Override
     >> >     public void next() throws IOException {
     >> >         System.out.println("MyIterator: next");
     >> >         source.next();
     >> >     }
     >> >
     >> >     @Override
     >> >     public void seek(Range range, Collection<ByteSequence>
     >> > columnFamilies,
     >> > boolean inclusive) throws IOException {
     >> >         System.out.println("MyIterator: seek");
     >> >         source.seek(range, columnFamilies, inclusive);
     >> >     }
     >> >
     >> >     @Override
     >> >     public Key getTopKey() {
     >> >         System.out.println("MyIterator: getTopKey");
     >> >         return source.getTopKey();
     >> >     }
     >> >
     >> >     @Override
     >> >     public Value getTopValue() {
     >> >         System.out.println("MyIterator: getTopValue");
     >> >         return source.getTopValue();
     >> >     }
     >> >
     >> >     @Override
     >> >     public SortedKeyValueIterator<Key, Value>
     >> > deepCopy(IteratorEnvironment
     >> > env) {
     >> >         System.out.println("MyIterator: deepCopy");
     >> >         return source.deepCopy(env);
     >> >     }
     >> > }
     >> >
     >> > @Test
     >> > public void testTearDown() throws Exception {
     >> >     String table = "test";
     >> >     Connector conn = cluster.getConnector("root", "secret");
     >> >     conn.tableOperations().create(table);
     >> >     conn.tableOperations().attachIterator(table, new
    IteratorSetting(25,
     >> > MyIterator.class));
     >> >     conn.tableOperations().setProperty(table,
    "table.scan.max.memory",
     >> > "1");
     >> >
     >> >     BatchWriter writer = conn.createBatchWriter(table, new
     >> > BatchWriterConfig());
     >> >
     >> >     Mutation m1 = new Mutation("row");
     >> >     m1.put("f1", "q1", 1, "val1");
     >> >     writer.addMutation(m1);
     >> >
     >> >     Mutation m2 = new Mutation("row");
     >> >     m2.put("f2", "q2", 1, "val2");
     >> >     writer.addMutation(m2);
     >> >
     >> >     Mutation m3 = new Mutation("row");
     >> >     m3.put("f3", "q3", 1, "val3");
     >> >     writer.addMutation(m3);
     >> >
     >> >     writer.flush();
     >> >     writer.close();
     >> >
     >> >     BatchScanner scanner = conn.createBatchScanner(table, new
     >> > Authorizations(), 3);
     >> >     scanner.setRanges(Collections.singletonList(new Range()));
     >> >     for(Map.Entry<Key, Value> entry : scanner) {
     >> >         System.out.println(entry.getKey() + " : " +
    entry.getValue());
     >> >     }
     >> >     System.out.println("Results complete!");
     >> > }

Reply via email to