junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r564872645



##########
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *        Revertable       BaseHashTable
+ *              ↑              ↑
+ *           SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *               ↑             ↑
+ *   TimelineHashSet       TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte

Review comment:
       typo thte

##########
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *        Revertable       BaseHashTable
+ *              ↑              ↑
+ *           SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *               ↑             ↑
+ *   TimelineHashSet       TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable<T extends 
SnapshottableHashTable.ElementWithStartEpoch>
+        extends BaseHashTable<T> implements Revertable {
+    interface ElementWithStartEpoch {
+        void setStartEpoch(long startEpoch);
+        long startEpoch();
+    }
+
+    static class HashTier<T> {
+        private final int size;
+        private BaseHashTable<T> deltaTable;
+
+        HashTier(int size) {
+            this.size = size;
+        }
+    }
+
+    /**
+     * Iterate over the values that currently exist in the hash table.
+     *
+     * You can use this iterator even if you are making changes to the map.
+     * The changes may or may not be visible while you are iterating.
+     */
+    class CurrentIterator implements Iterator<T> {
+        private final Object[] topTier;
+        private final List<T> ready;
+        private int slot;
+        private T lastReturned;
+
+        CurrentIterator(Object[] topTier) {
+            this.topTier = topTier;
+            this.ready = new ArrayList<>();
+            this.slot = 0;
+            this.lastReturned = null;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (ready.isEmpty()) {
+                if (slot == topTier.length) {
+                    return false;
+                }
+                BaseHashTable.unpackSlot(ready, topTier, slot);
+                slot++;
+            }
+            return true;
+        }
+
+        @Override
+        public T next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            lastReturned = ready.remove(ready.size() - 1);
+            return lastReturned;
+        }
+
+        @Override
+        public void remove() {
+            if (lastReturned == null) {
+                throw new UnsupportedOperationException("remove");
+            }
+            snapshottableRemove(lastReturned);
+            lastReturned = null;
+        }
+    }
+
+    /**
+     * Iterate over the values that existed in the hash table during a 
specific snapshot.
+     *
+     * You can use this iterator even if you are making changes to the map.
+     * The snapshot is immutable and will always show up the same.
+     */
+    class HistoricalIterator implements Iterator<T> {
+        private final Object[] topTier;
+        private final Snapshot snapshot;
+        private final List<T> temp;
+        private final List<T> ready;
+        private int slot;
+
+        HistoricalIterator(Object[] topTier, Snapshot snapshot) {
+            this.topTier = topTier;
+            this.snapshot = snapshot;
+            this.temp = new ArrayList<>();
+            this.ready = new ArrayList<>();
+            this.slot = 0;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (ready.isEmpty()) {
+                if (slot == topTier.length) {
+                    return false;
+                }
+                BaseHashTable.unpackSlot(temp, topTier, slot);
+                for (T object : temp) {
+                    if (object.startEpoch() <= snapshot.epoch()) {
+                        ready.add(object);
+                    }
+                }
+                temp.clear();
+
+                /*
+                 * As we iterate over the SnapshottableHashTable, elements may 
move from
+                 * the top tier into the snapshot tiers.  This would happen if 
something
+                 * were deleted in the top tier, for example, but still 
retained in the
+                 * snapshot.
+                 *
+                 * We don't want to return any elements twice, though.  
Therefore, we
+                 * iterate over the top tier and the snapshot tier at the
+                 * same time.  The key to understanding how this works is 
realizing that
+                 * both hash tables use the same hash function, but simply 
choose a
+                 * different number of significant bits based on their size.
+                 * So if the top tier has size 4 and the snapshot tier has 
size 2, we have
+                 * the following mapping:
+                 *
+                 * Elements that would be in slot 0 or 1 in the top tier can 
only be in
+                 * slot 0 in the snapshot tier.
+                 * Elements that would be in slot 2 or 3 in the top tier can 
only be in
+                 * slot 1 in the snapshot tier.
+                 *
+                 * Therefore, we can do something like this:
+                 * 1. check slot 0 in the top tier and slot 0 in the snapshot 
tier.
+                 * 2. check slot 1 in the top tier and slot 0 in the snapshot 
tier.
+                 * 3. check slot 2 in the top tier and slot 1 in the snapshot 
tier.
+                 * 4. check slot 3 in the top tier and slot 1 in the snapshot 
tier.
+                 *
+                 * If elements move from the top tier to the snapshot tier, 
then
+                 * we'll still find them and report them exactly once.
+                 *
+                 * Note that while I used 4 and 2 as example sizes here, the 
same pattern
+                 * holds for different powers of two.  The "snapshot slot" of 
an element
+                 * will be the top few bits of the top tier slot of that 
element.
+                 */
+                HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+                if (tier != null && tier.deltaTable != null) {
+                    BaseHashTable<T> deltaTable = tier.deltaTable;
+                    int shift = 
Integer.numberOfLeadingZeros(deltaTable.baseElements().length) -
+                        Integer.numberOfLeadingZeros(topTier.length);
+                    int tierSlot = slot >>> shift;
+                    BaseHashTable.unpackSlot(temp, deltaTable.baseElements(), 
tierSlot);
+                    for (T object : temp) {
+                        if (BaseHashTable.findSlot(object, topTier.length) == 
slot) {
+                            ready.add(object);
+                        } else {
+                        }
+                    }
+                    temp.clear();
+                }
+                slot++;
+            }
+            return true;
+        }
+
+        @Override
+        public T next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return ready.remove(ready.size() - 1);
+        }
+    }
+
+    private final SnapshotRegistry snapshotRegistry;
+
+    SnapshottableHashTable(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+        super(expectedSize);
+        this.snapshotRegistry = snapshotRegistry;
+    }
+
+    int snapshottableSize(long epoch) {
+        if (epoch == Long.MAX_VALUE) {
+            return baseSize();
+        } else {
+            Snapshot snapshot = snapshotRegistry.get(epoch);
+            HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+            if (tier == null) {
+                return baseSize();
+            } else {
+                return tier.size;
+            }
+        }
+    }
+
+    T snapshottableGet(Object key, long epoch) {
+        T result = baseGet(key);
+        if (result != null && result.startEpoch() <= epoch) {
+            return result;
+        }
+        if (epoch == Long.MAX_VALUE) {

Review comment:
       Instead of directly using Long.MAX_VALUE, perhaps we could use a special 
constant like NO_EPOCH or sth like that to make it clear?

##########
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshotRegistry.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+    private final Logger log;
+
+    /**
+     * The current epoch.  All snapshot epochs are lower than this number.
+     */
+    private long curEpoch;
+
+    /**
+     * An ArrayList of snapshots, kept in sorted order.
+     */
+    private final ArrayList<Snapshot> snapshots;
+
+    public SnapshotRegistry(long startEpoch) {
+        this(new LogContext(), startEpoch);
+    }
+
+    public SnapshotRegistry(LogContext logContext, long startEpoch) {
+        this.log = logContext.logger(SnapshotRegistry.class);
+        this.curEpoch = startEpoch;
+        this.snapshots = new ArrayList<>(5);
+    }
+
+    /**
+     * Returns an iterator that moves through snapshots from the lowest to the 
highest epoch.
+     */
+    public Iterator<Snapshot> snapshots() {
+        return snapshots.iterator();
+    }
+
+    /**
+     * Gets the snapshot for a specific epoch.
+     */
+    public Snapshot get(long epoch) {
+        for (Snapshot snapshot : snapshots) {
+            if (snapshot.epoch() == epoch) {
+                return snapshot;
+            }
+        }
+        throw new RuntimeException("No snapshot for epoch " + epoch);
+    }
+
+    /**
+     * Creates a new snapshot at the given epoch.
+     *
+     * @param epoch             The epoch to create the snapshot at.  The 
current epoch
+     *                          will be advanced to one past this epoch.
+     */
+    public Snapshot createSnapshot(long epoch) {
+        if (epoch < curEpoch) {
+            throw new RuntimeException("Can't create a new snapshot at epoch " 
+ epoch +
+                    " because the current epoch is " + curEpoch);
+        }
+        Snapshot snapshot = new Snapshot(epoch);
+        snapshots.add(snapshot);
+        curEpoch = epoch + 1;
+        log.debug("Creating snapshot {}", epoch);
+        return snapshot;
+    }
+
+    /**
+     * Deletes the snapshot with the given epoch.
+     *
+     * @param epoch             The epoch of the snapshot to delete.
+     */
+    public void deleteSnapshot(long epoch) {
+        Iterator<Snapshot> iter = snapshots.iterator();
+        while (iter.hasNext()) {
+            Snapshot snapshot = iter.next();
+            if (snapshot.epoch() == epoch) {
+                log.debug("Deleting snapshot {}", epoch);
+                iter.remove();
+                return;
+            }
+        }
+        throw new RuntimeException(String.format(
+            "No snapshot at epoch %d found. Snapshot epochs are %s.", epoch,
+                snapshots.stream().map(snapshot -> 
String.valueOf(snapshot.epoch())).
+                    collect(Collectors.joining(", "))));
+    }
+
+    /**
+     * Reverts the state of all data structures to the state at the given 
epoch.
+     *
+     * @param epoch             The epoch of the snapshot to revert to.
+     */
+    public void revertToSnapshot(long epoch) {

Review comment:
       Should this method also remove all snapshots with epoch > the input 
epoch?

##########
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashSet.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This is a hash set which can be snapshotted.
+ *
+ * See {@SnapshottableHashTable} for more details about the implementation.
+ *
+ * This class requires external synchronization.  Null values are not 
supported.
+ *
+ * @param <T>   The value type of the set.
+ */
+public class TimelineHashSet<T>
+        extends SnapshottableHashTable<TimelineHashSet.TimelineHashSetEntry<T>>
+        implements Set<T> {
+    static class TimelineHashSetEntry<T>
+            implements SnapshottableHashTable.ElementWithStartEpoch {
+        private final T value;
+        private long startEpoch;
+
+        TimelineHashSetEntry(T value) {
+            this.value = value;
+            this.startEpoch = Long.MAX_VALUE;
+        }
+
+        public T getValue() {
+            return value;
+        }
+
+        @Override
+        public void setStartEpoch(long startEpoch) {
+            this.startEpoch = startEpoch;
+        }
+
+        @Override
+        public long startEpoch() {
+            return startEpoch;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof TimelineHashSetEntry)) return false;
+            TimelineHashSetEntry<T> other = (TimelineHashSetEntry<T>) o;
+            return value.equals(other.value);
+        }
+
+        @Override
+        public int hashCode() {
+            return value.hashCode();
+        }
+    }
+
+    public TimelineHashSet(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+        super(snapshotRegistry, expectedSize);
+    }
+
+    @Override
+    public int size() {
+        return size(Long.MAX_VALUE);
+    }
+
+    public int size(long epoch) {
+        return snapshottableSize(epoch);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return isEmpty(Long.MAX_VALUE);
+    }
+
+    public boolean isEmpty(long epoch) {
+        return snapshottableSize(epoch) == 0;
+    }
+
+    @Override
+    public boolean contains(Object key) {
+        return contains(key, Long.MAX_VALUE);
+    }
+
+    public boolean contains(Object object, long epoch) {
+        return snapshottableGet(new TimelineHashSetEntry<>(object), epoch) != 
null;
+    }
+
+    final class ValueIterator implements Iterator<T> {
+        private final Iterator<TimelineHashSetEntry<T>> iter;
+
+        ValueIterator(long epoch) {
+            this.iter = snapshottableIterator(epoch);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public T next() {
+            return iter.next().value;
+        }
+
+        @Override
+        public void remove() {
+            iter.remove();
+        }
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+        return iterator(Long.MAX_VALUE);
+    }
+
+    public Iterator<T> iterator(long epoch) {
+        return new ValueIterator(epoch);
+    }
+
+    @Override
+    public Object[] toArray() {
+        Object[] result = new Object[size()];
+        Iterator<T> iter = iterator();
+        int i = 0;
+        while (iter.hasNext()) {
+            result[i++] = iter.next();
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> R[] toArray(R[] a) {
+        int size = size();
+        if (size <= a.length) {
+            Iterator<T> iter = iterator();
+            int i = 0;
+            while (iter.hasNext()) {
+                a[i++] = (R) iter.next();
+            }
+            while (i < a.length) {
+                a[i++] = null;
+            }
+            return a;
+        } else {
+            return (R[]) toArray();
+        }
+    }
+
+    @Override
+    public boolean add(T newValue) {
+        Objects.requireNonNull(newValue);
+        return snapshottableAddUnlessPresent(new 
TimelineHashSetEntry<>(newValue));
+    }
+
+    @Override
+    public boolean remove(Object value) {
+        return snapshottableRemove(new TimelineHashSetEntry<>(value)) != null;
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> collection) {
+        for (Object value : collection) {
+            if (!contains(value)) return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends T> collection) {
+        boolean modified = false;
+        for (T value : collection) {
+            if (add(value)) {
+                modified = true;
+            }
+        }
+        return modified;
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> collection) {
+        Objects.requireNonNull(collection);
+        boolean modified = false;
+        Iterator<T> it = iterator();
+        while (it.hasNext()) {
+            if (!collection.contains(it.next())) {
+                it.remove();
+                modified = true;
+            }
+        }
+        return modified;
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> collection) {
+        Objects.requireNonNull(collection);
+        boolean modified = false;
+        Iterator<?> it = iterator();
+        while (it.hasNext()) {
+            if (collection.contains(it.next())) {
+                it.remove();
+                modified = true;
+            }
+        }
+        return modified;
+    }
+
+    @Override
+    public void clear() {

Review comment:
       Should we clear all snapshots in this set too?

##########
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *        Revertable       BaseHashTable
+ *              ↑              ↑
+ *           SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *               ↑             ↑
+ *   TimelineHashSet       TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable<T extends 
SnapshottableHashTable.ElementWithStartEpoch>
+        extends BaseHashTable<T> implements Revertable {
+    interface ElementWithStartEpoch {
+        void setStartEpoch(long startEpoch);
+        long startEpoch();
+    }
+
+    static class HashTier<T> {
+        private final int size;
+        private BaseHashTable<T> deltaTable;
+
+        HashTier(int size) {
+            this.size = size;
+        }
+    }
+
+    /**
+     * Iterate over the values that currently exist in the hash table.
+     *
+     * You can use this iterator even if you are making changes to the map.
+     * The changes may or may not be visible while you are iterating.
+     */
+    class CurrentIterator implements Iterator<T> {
+        private final Object[] topTier;
+        private final List<T> ready;
+        private int slot;
+        private T lastReturned;
+
+        CurrentIterator(Object[] topTier) {
+            this.topTier = topTier;
+            this.ready = new ArrayList<>();
+            this.slot = 0;
+            this.lastReturned = null;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (ready.isEmpty()) {
+                if (slot == topTier.length) {
+                    return false;
+                }
+                BaseHashTable.unpackSlot(ready, topTier, slot);
+                slot++;
+            }
+            return true;
+        }
+
+        @Override
+        public T next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            lastReturned = ready.remove(ready.size() - 1);
+            return lastReturned;
+        }
+
+        @Override
+        public void remove() {
+            if (lastReturned == null) {
+                throw new UnsupportedOperationException("remove");
+            }
+            snapshottableRemove(lastReturned);
+            lastReturned = null;
+        }
+    }
+
+    /**
+     * Iterate over the values that existed in the hash table during a 
specific snapshot.
+     *
+     * You can use this iterator even if you are making changes to the map.
+     * The snapshot is immutable and will always show up the same.
+     */
+    class HistoricalIterator implements Iterator<T> {
+        private final Object[] topTier;
+        private final Snapshot snapshot;
+        private final List<T> temp;
+        private final List<T> ready;
+        private int slot;
+
+        HistoricalIterator(Object[] topTier, Snapshot snapshot) {
+            this.topTier = topTier;
+            this.snapshot = snapshot;
+            this.temp = new ArrayList<>();
+            this.ready = new ArrayList<>();
+            this.slot = 0;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (ready.isEmpty()) {
+                if (slot == topTier.length) {
+                    return false;
+                }
+                BaseHashTable.unpackSlot(temp, topTier, slot);
+                for (T object : temp) {
+                    if (object.startEpoch() <= snapshot.epoch()) {
+                        ready.add(object);
+                    }
+                }
+                temp.clear();
+
+                /*
+                 * As we iterate over the SnapshottableHashTable, elements may 
move from
+                 * the top tier into the snapshot tiers.  This would happen if 
something
+                 * were deleted in the top tier, for example, but still 
retained in the
+                 * snapshot.
+                 *
+                 * We don't want to return any elements twice, though.  
Therefore, we
+                 * iterate over the top tier and the snapshot tier at the
+                 * same time.  The key to understanding how this works is 
realizing that
+                 * both hash tables use the same hash function, but simply 
choose a
+                 * different number of significant bits based on their size.
+                 * So if the top tier has size 4 and the snapshot tier has 
size 2, we have
+                 * the following mapping:
+                 *
+                 * Elements that would be in slot 0 or 1 in the top tier can 
only be in
+                 * slot 0 in the snapshot tier.
+                 * Elements that would be in slot 2 or 3 in the top tier can 
only be in
+                 * slot 1 in the snapshot tier.
+                 *
+                 * Therefore, we can do something like this:
+                 * 1. check slot 0 in the top tier and slot 0 in the snapshot 
tier.
+                 * 2. check slot 1 in the top tier and slot 0 in the snapshot 
tier.
+                 * 3. check slot 2 in the top tier and slot 1 in the snapshot 
tier.
+                 * 4. check slot 3 in the top tier and slot 1 in the snapshot 
tier.
+                 *
+                 * If elements move from the top tier to the snapshot tier, 
then
+                 * we'll still find them and report them exactly once.
+                 *
+                 * Note that while I used 4 and 2 as example sizes here, the 
same pattern
+                 * holds for different powers of two.  The "snapshot slot" of 
an element
+                 * will be the top few bits of the top tier slot of that 
element.
+                 */
+                HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+                if (tier != null && tier.deltaTable != null) {
+                    BaseHashTable<T> deltaTable = tier.deltaTable;
+                    int shift = 
Integer.numberOfLeadingZeros(deltaTable.baseElements().length) -
+                        Integer.numberOfLeadingZeros(topTier.length);
+                    int tierSlot = slot >>> shift;
+                    BaseHashTable.unpackSlot(temp, deltaTable.baseElements(), 
tierSlot);
+                    for (T object : temp) {
+                        if (BaseHashTable.findSlot(object, topTier.length) == 
slot) {
+                            ready.add(object);
+                        } else {
+                        }
+                    }
+                    temp.clear();
+                }
+                slot++;
+            }
+            return true;
+        }
+
+        @Override
+        public T next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return ready.remove(ready.size() - 1);
+        }
+    }
+
+    private final SnapshotRegistry snapshotRegistry;
+
+    SnapshottableHashTable(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+        super(expectedSize);
+        this.snapshotRegistry = snapshotRegistry;
+    }
+
+    int snapshottableSize(long epoch) {
+        if (epoch == Long.MAX_VALUE) {
+            return baseSize();
+        } else {
+            Snapshot snapshot = snapshotRegistry.get(epoch);
+            HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+            if (tier == null) {
+                return baseSize();
+            } else {
+                return tier.size;
+            }
+        }
+    }
+
+    T snapshottableGet(Object key, long epoch) {
+        T result = baseGet(key);
+        if (result != null && result.startEpoch() <= epoch) {
+            return result;
+        }
+        if (epoch == Long.MAX_VALUE) {
+            return null;
+        }
+        Snapshot snapshot = snapshotRegistry.get(epoch);
+        HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+        if (tier == null || tier.deltaTable == null) {
+            return null;
+        }
+        result = tier.deltaTable.baseGet(key);
+        return result;
+    }
+
+    boolean snapshottableAddUnlessPresent(T object) {
+        T prev = baseGet(object);
+        if (prev != null) {
+            return false;
+        }
+        object.setStartEpoch(snapshotRegistry.curEpoch());
+        int prevSize = baseSize();
+        baseAddOrReplace(object);
+        updateTierData(prevSize);
+        return true;
+    }
+
+    T snapshottableAddOrReplace(T object) {
+        object.setStartEpoch(snapshotRegistry.curEpoch());
+        int prevSize = baseSize();
+        T prev = baseAddOrReplace(object);
+        if (prev == null) {
+            updateTierData(prevSize);
+        } else {
+            updateTierData(prev, prevSize);
+        }
+        return prev;
+    }
+
+    Object snapshottableRemove(Object object) {
+        T prev = baseRemove(object);
+        if (prev == null) {
+            return null;
+        } else {
+            updateTierData(prev, baseSize() + 1);
+            return prev;
+        }
+    }
+
+    private void updateTierData(int prevSize) {
+        Iterator<Snapshot> iter = snapshotRegistry.snapshots();
+        while (iter.hasNext()) {
+            Snapshot snapshot = iter.next();
+            HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+            if (tier == null) {
+                tier = new HashTier<>(prevSize);
+                snapshot.setData(SnapshottableHashTable.this, tier);

Review comment:
       Do we need to create a new tier here or could we delay the allocation 
until it's truly needed (which already exists in `updateTierData(T prev, int 
prevSize)`)?

##########
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/BaseHashTable.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;

Review comment:
       The file path doesn't quite match the package name since it has 2 
timeline in it. Other files have the same issue.

##########
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *        Revertable       BaseHashTable
+ *              ↑              ↑
+ *           SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *               ↑             ↑
+ *   TimelineHashSet       TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable<T extends 
SnapshottableHashTable.ElementWithStartEpoch>
+        extends BaseHashTable<T> implements Revertable {
+    interface ElementWithStartEpoch {
+        void setStartEpoch(long startEpoch);
+        long startEpoch();
+    }
+
+    static class HashTier<T> {
+        private final int size;
+        private BaseHashTable<T> deltaTable;
+
+        HashTier(int size) {
+            this.size = size;
+        }
+    }
+
+    /**
+     * Iterate over the values that currently exist in the hash table.
+     *
+     * You can use this iterator even if you are making changes to the map.
+     * The changes may or may not be visible while you are iterating.
+     */
+    class CurrentIterator implements Iterator<T> {
+        private final Object[] topTier;
+        private final List<T> ready;
+        private int slot;
+        private T lastReturned;
+
+        CurrentIterator(Object[] topTier) {
+            this.topTier = topTier;
+            this.ready = new ArrayList<>();
+            this.slot = 0;
+            this.lastReturned = null;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (ready.isEmpty()) {
+                if (slot == topTier.length) {
+                    return false;
+                }
+                BaseHashTable.unpackSlot(ready, topTier, slot);
+                slot++;
+            }
+            return true;
+        }
+
+        @Override
+        public T next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            lastReturned = ready.remove(ready.size() - 1);
+            return lastReturned;
+        }
+
+        @Override
+        public void remove() {
+            if (lastReturned == null) {
+                throw new UnsupportedOperationException("remove");
+            }
+            snapshottableRemove(lastReturned);
+            lastReturned = null;
+        }
+    }
+
+    /**
+     * Iterate over the values that existed in the hash table during a 
specific snapshot.
+     *
+     * You can use this iterator even if you are making changes to the map.
+     * The snapshot is immutable and will always show up the same.
+     */
+    class HistoricalIterator implements Iterator<T> {
+        private final Object[] topTier;
+        private final Snapshot snapshot;
+        private final List<T> temp;
+        private final List<T> ready;
+        private int slot;
+
+        HistoricalIterator(Object[] topTier, Snapshot snapshot) {
+            this.topTier = topTier;
+            this.snapshot = snapshot;
+            this.temp = new ArrayList<>();
+            this.ready = new ArrayList<>();
+            this.slot = 0;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (ready.isEmpty()) {
+                if (slot == topTier.length) {
+                    return false;
+                }
+                BaseHashTable.unpackSlot(temp, topTier, slot);
+                for (T object : temp) {
+                    if (object.startEpoch() <= snapshot.epoch()) {
+                        ready.add(object);
+                    }
+                }
+                temp.clear();
+
+                /*
+                 * As we iterate over the SnapshottableHashTable, elements may 
move from
+                 * the top tier into the snapshot tiers.  This would happen if 
something
+                 * were deleted in the top tier, for example, but still 
retained in the
+                 * snapshot.
+                 *
+                 * We don't want to return any elements twice, though.  
Therefore, we
+                 * iterate over the top tier and the snapshot tier at the
+                 * same time.  The key to understanding how this works is 
realizing that
+                 * both hash tables use the same hash function, but simply 
choose a
+                 * different number of significant bits based on their size.
+                 * So if the top tier has size 4 and the snapshot tier has 
size 2, we have
+                 * the following mapping:
+                 *
+                 * Elements that would be in slot 0 or 1 in the top tier can 
only be in
+                 * slot 0 in the snapshot tier.
+                 * Elements that would be in slot 2 or 3 in the top tier can 
only be in
+                 * slot 1 in the snapshot tier.
+                 *
+                 * Therefore, we can do something like this:
+                 * 1. check slot 0 in the top tier and slot 0 in the snapshot 
tier.
+                 * 2. check slot 1 in the top tier and slot 0 in the snapshot 
tier.
+                 * 3. check slot 2 in the top tier and slot 1 in the snapshot 
tier.
+                 * 4. check slot 3 in the top tier and slot 1 in the snapshot 
tier.
+                 *
+                 * If elements move from the top tier to the snapshot tier, 
then
+                 * we'll still find them and report them exactly once.
+                 *
+                 * Note that while I used 4 and 2 as example sizes here, the 
same pattern
+                 * holds for different powers of two.  The "snapshot slot" of 
an element
+                 * will be the top few bits of the top tier slot of that 
element.
+                 */
+                HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+                if (tier != null && tier.deltaTable != null) {
+                    BaseHashTable<T> deltaTable = tier.deltaTable;
+                    int shift = 
Integer.numberOfLeadingZeros(deltaTable.baseElements().length) -
+                        Integer.numberOfLeadingZeros(topTier.length);
+                    int tierSlot = slot >>> shift;
+                    BaseHashTable.unpackSlot(temp, deltaTable.baseElements(), 
tierSlot);
+                    for (T object : temp) {
+                        if (BaseHashTable.findSlot(object, topTier.length) == 
slot) {
+                            ready.add(object);
+                        } else {
+                        }
+                    }
+                    temp.clear();
+                }
+                slot++;
+            }
+            return true;
+        }
+
+        @Override
+        public T next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return ready.remove(ready.size() - 1);
+        }
+    }
+
+    private final SnapshotRegistry snapshotRegistry;
+
+    SnapshottableHashTable(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+        super(expectedSize);
+        this.snapshotRegistry = snapshotRegistry;
+    }
+
+    int snapshottableSize(long epoch) {
+        if (epoch == Long.MAX_VALUE) {
+            return baseSize();
+        } else {
+            Snapshot snapshot = snapshotRegistry.get(epoch);
+            HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+            if (tier == null) {
+                return baseSize();
+            } else {
+                return tier.size;
+            }
+        }
+    }
+
+    T snapshottableGet(Object key, long epoch) {
+        T result = baseGet(key);
+        if (result != null && result.startEpoch() <= epoch) {
+            return result;
+        }
+        if (epoch == Long.MAX_VALUE) {
+            return null;
+        }
+        Snapshot snapshot = snapshotRegistry.get(epoch);
+        HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+        if (tier == null || tier.deltaTable == null) {
+            return null;
+        }
+        result = tier.deltaTable.baseGet(key);
+        return result;
+    }
+
+    boolean snapshottableAddUnlessPresent(T object) {
+        T prev = baseGet(object);
+        if (prev != null) {
+            return false;
+        }
+        object.setStartEpoch(snapshotRegistry.curEpoch());
+        int prevSize = baseSize();
+        baseAddOrReplace(object);
+        updateTierData(prevSize);

Review comment:
       `baseAddOrReplace()` already calls `updateTierData()`. Do we need to 
call it again here?

##########
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This is a hash map which can be snapshotted.
+ *
+ * See {@SnapshottableHashTable} for more details about the implementation.
+ *
+ * This class requires external synchronization.  Null keys and values are not 
supported.
+ *
+ * @param <K>   The key type of the set.
+ * @param <V>   The value type of the set.
+ */
+public class TimelineHashMap<K, V>
+        extends SnapshottableHashTable<TimelineHashMap.TimelineHashMapEntry<K, 
V>>
+        implements Map<K, V> {
+    static class TimelineHashMapEntry<K, V>
+            implements SnapshottableHashTable.ElementWithStartEpoch, 
Map.Entry<K, V> {
+        private final K key;
+        private final V value;
+        private long startEpoch;
+
+        TimelineHashMapEntry(K key, V value) {
+            this.key = key;
+            this.value = value;
+            this.startEpoch = Long.MAX_VALUE;
+        }
+
+        @Override
+        public K getKey() {
+            return key;
+        }
+
+        @Override
+        public V getValue() {
+            return value;
+        }
+
+        @Override
+        public V setValue(V value) {
+            // This would be inefficient to support since we'd need a 
back-reference
+            // to the enclosing map in each Entry object.  There would also be
+            // complications if this entry object was sourced from a 
historical iterator;
+            // we don't support modifying the past.  Since we don't really 
need this API,
+            // let's just not support it.
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void setStartEpoch(long startEpoch) {
+            this.startEpoch = startEpoch;
+        }
+
+        @Override
+        public long startEpoch() {
+            return startEpoch;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof TimelineHashMapEntry)) return false;
+            TimelineHashMapEntry<K, V> other = (TimelineHashMapEntry<K, V>) o;
+            return key.equals(other.key);
+        }
+
+        @Override
+        public int hashCode() {
+            return key.hashCode();
+        }
+    }
+
+    public TimelineHashMap(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+        super(snapshotRegistry, expectedSize);
+    }
+
+    @Override
+    public int size() {
+        return size(Long.MAX_VALUE);
+    }
+
+    public int size(long epoch) {
+        return snapshottableSize(epoch);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return isEmpty(Long.MAX_VALUE);
+    }
+
+    public boolean isEmpty(long epoch) {
+        return snapshottableSize(epoch) == 0;
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return containsKey(key, Long.MAX_VALUE);
+    }
+
+    public boolean containsKey(Object key, long epoch) {
+        return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) 
!= null;
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        Iterator<Entry<K, V>> iter = entrySet().iterator();
+        while (iter.hasNext()) {
+            Entry<K, V> e = iter.next();
+            if (value.equals(e.getValue())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public V get(Object key) {
+        return get(key, Long.MAX_VALUE);
+    }
+
+    public V get(Object key, long epoch) {
+        Entry<K, V> entry =
+            snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch);
+        if (entry == null) {
+            return null;
+        }
+        return entry.getValue();
+    }
+
+    @Override
+    public V put(K key, V value) {
+        Objects.requireNonNull(key);
+        Objects.requireNonNull(value);
+        TimelineHashMapEntry<K, V> entry = new TimelineHashMapEntry<>(key, 
value);
+        TimelineHashMapEntry<K, V> prev = snapshottableAddOrReplace(entry);
+        if (prev == null) {
+            return null;
+        }
+        return prev.getValue();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public V remove(Object key) {
+        return (V) snapshottableRemove(new TimelineHashMapEntry<>(key, null));
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> map) {
+        for (Map.Entry<? extends K, ? extends V> e : map.entrySet()) {
+            put(e.getKey(), e.getValue());
+        }
+    }
+
+    @Override
+    public void clear() {

Review comment:
       Should we clear all snapshots in this map too?

##########
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshotRegistry.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+    private final Logger log;
+
+    /**
+     * The current epoch.  All snapshot epochs are lower than this number.
+     */
+    private long curEpoch;
+
+    /**
+     * An ArrayList of snapshots, kept in sorted order.
+     */
+    private final ArrayList<Snapshot> snapshots;
+
+    public SnapshotRegistry(long startEpoch) {
+        this(new LogContext(), startEpoch);
+    }
+
+    public SnapshotRegistry(LogContext logContext, long startEpoch) {
+        this.log = logContext.logger(SnapshotRegistry.class);
+        this.curEpoch = startEpoch;
+        this.snapshots = new ArrayList<>(5);
+    }
+
+    /**
+     * Returns an iterator that moves through snapshots from the lowest to the 
highest epoch.
+     */
+    public Iterator<Snapshot> snapshots() {
+        return snapshots.iterator();
+    }
+
+    /**
+     * Gets the snapshot for a specific epoch.
+     */
+    public Snapshot get(long epoch) {
+        for (Snapshot snapshot : snapshots) {
+            if (snapshot.epoch() == epoch) {
+                return snapshot;
+            }
+        }
+        throw new RuntimeException("No snapshot for epoch " + epoch);
+    }
+
+    /**
+     * Creates a new snapshot at the given epoch.
+     *
+     * @param epoch             The epoch to create the snapshot at.  The 
current epoch
+     *                          will be advanced to one past this epoch.
+     */
+    public Snapshot createSnapshot(long epoch) {

Review comment:
       It seems that we want the caller to provide a monotonically increasing 
epoch. Another model is not to have the caller provide an epoch and have each 
createSnapshot() create a new epoch and return it to the caller. Will that 
model work for our use case and be simpler?

##########
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This is a hash map which can be snapshotted.
+ *
+ * See {@SnapshottableHashTable} for more details about the implementation.
+ *
+ * This class requires external synchronization.  Null keys and values are not 
supported.
+ *
+ * @param <K>   The key type of the set.
+ * @param <V>   The value type of the set.
+ */
+public class TimelineHashMap<K, V>
+        extends SnapshottableHashTable<TimelineHashMap.TimelineHashMapEntry<K, 
V>>
+        implements Map<K, V> {
+    static class TimelineHashMapEntry<K, V>
+            implements SnapshottableHashTable.ElementWithStartEpoch, 
Map.Entry<K, V> {
+        private final K key;
+        private final V value;
+        private long startEpoch;
+
+        TimelineHashMapEntry(K key, V value) {
+            this.key = key;
+            this.value = value;
+            this.startEpoch = Long.MAX_VALUE;
+        }
+
+        @Override
+        public K getKey() {
+            return key;
+        }
+
+        @Override
+        public V getValue() {
+            return value;
+        }
+
+        @Override
+        public V setValue(V value) {
+            // This would be inefficient to support since we'd need a 
back-reference
+            // to the enclosing map in each Entry object.  There would also be
+            // complications if this entry object was sourced from a 
historical iterator;
+            // we don't support modifying the past.  Since we don't really 
need this API,
+            // let's just not support it.
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void setStartEpoch(long startEpoch) {
+            this.startEpoch = startEpoch;
+        }
+
+        @Override
+        public long startEpoch() {
+            return startEpoch;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof TimelineHashMapEntry)) return false;
+            TimelineHashMapEntry<K, V> other = (TimelineHashMapEntry<K, V>) o;
+            return key.equals(other.key);
+        }
+
+        @Override
+        public int hashCode() {
+            return key.hashCode();
+        }
+    }
+
+    public TimelineHashMap(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+        super(snapshotRegistry, expectedSize);
+    }
+
+    @Override
+    public int size() {
+        return size(Long.MAX_VALUE);
+    }
+
+    public int size(long epoch) {
+        return snapshottableSize(epoch);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return isEmpty(Long.MAX_VALUE);
+    }
+
+    public boolean isEmpty(long epoch) {
+        return snapshottableSize(epoch) == 0;
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return containsKey(key, Long.MAX_VALUE);
+    }
+
+    public boolean containsKey(Object key, long epoch) {
+        return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) 
!= null;
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        Iterator<Entry<K, V>> iter = entrySet().iterator();
+        while (iter.hasNext()) {
+            Entry<K, V> e = iter.next();
+            if (value.equals(e.getValue())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public V get(Object key) {
+        return get(key, Long.MAX_VALUE);
+    }
+
+    public V get(Object key, long epoch) {
+        Entry<K, V> entry =
+            snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch);
+        if (entry == null) {
+            return null;
+        }
+        return entry.getValue();
+    }
+
+    @Override
+    public V put(K key, V value) {
+        Objects.requireNonNull(key);
+        Objects.requireNonNull(value);
+        TimelineHashMapEntry<K, V> entry = new TimelineHashMapEntry<>(key, 
value);
+        TimelineHashMapEntry<K, V> prev = snapshottableAddOrReplace(entry);
+        if (prev == null) {
+            return null;
+        }
+        return prev.getValue();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public V remove(Object key) {
+        return (V) snapshottableRemove(new TimelineHashMapEntry<>(key, null));
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> map) {
+        for (Map.Entry<? extends K, ? extends V> e : map.entrySet()) {
+            put(e.getKey(), e.getValue());
+        }
+    }
+
+    @Override
+    public void clear() {
+        Iterator<TimelineHashMapEntry<K, V>> iter = 
snapshottableIterator(Long.MAX_VALUE);
+        while (iter.hasNext()) {
+            iter.next();
+            iter.remove();
+        }
+    }
+
+    final class KeySet extends AbstractSet<K> {
+        private final long epoch;
+
+        KeySet(long epoch) {
+            this.epoch = epoch;
+        }
+
+        public final int size() {
+            return TimelineHashMap.this.size(epoch);
+        }
+
+        public final void clear() {
+            if (epoch != Long.MAX_VALUE) {
+                throw new RuntimeException("can't modify snapshot");
+            }
+            TimelineHashMap.this.clear();
+        }
+
+        public final Iterator<K> iterator() {
+            return new KeyIterator(epoch);
+        }
+
+        public final boolean contains(Object o) {
+            return TimelineHashMap.this.containsKey(o, epoch);
+        }
+
+        public final boolean remove(Object o) {
+            if (epoch != Long.MAX_VALUE) {
+                throw new RuntimeException("can't modify snapshot");
+            }
+            return TimelineHashMap.this.remove(o) != null;
+        }
+    }
+
+    final class KeyIterator implements Iterator<K> {
+        private final Iterator<TimelineHashMapEntry<K, V>> iter;
+
+        KeyIterator(long epoch) {
+            this.iter = snapshottableIterator(epoch);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public K next() {
+            TimelineHashMapEntry<K, V> next = iter.next();
+            return next.getKey();
+        }
+
+        @Override
+        public void remove() {
+            iter.remove();
+        }
+    }
+
+    @Override
+    public Set<K> keySet() {
+        return keySet(Long.MAX_VALUE);
+    }
+
+    public Set<K> keySet(long epoch) {
+        return new KeySet(epoch);
+    }
+
+    final class Values extends AbstractCollection<V> {
+        private final long epoch;
+
+        Values(long epoch) {
+            this.epoch = epoch;
+        }
+
+        public final int size() {
+            return TimelineHashMap.this.size(epoch);
+        }
+
+        public final void clear() {
+            if (epoch != Long.MAX_VALUE) {
+                throw new RuntimeException("can't modify snapshot");
+            }
+            TimelineHashMap.this.clear();
+        }
+
+        public final Iterator<V> iterator() {
+            return new ValueIterator(epoch);
+        }
+
+        public final boolean contains(Object o) {
+            return TimelineHashMap.this.containsKey(o, epoch);
+        }
+    }
+
+    final class ValueIterator implements Iterator<V> {
+        private final Iterator<TimelineHashMapEntry<K, V>> iter;
+
+        ValueIterator(long epoch) {
+            this.iter = snapshottableIterator(epoch);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public V next() {
+            TimelineHashMapEntry<K, V> next = iter.next();
+            return next.getValue();
+        }
+
+        @Override
+        public void remove() {
+            iter.remove();
+        }
+    }
+
+    @Override
+    public Collection<V> values() {
+        return values(Long.MAX_VALUE);
+    }
+
+    public Collection<V> values(long epoch) {
+        return new Values(epoch);
+    }
+
+    final class EntrySet extends AbstractSet<Map.Entry<K, V>> {
+        private final long epoch;
+
+        EntrySet(long epoch) {
+            this.epoch = epoch;
+        }
+
+        public final int size() {
+            return TimelineHashMap.this.size(epoch);
+        }
+
+        public final void clear() {
+            if (epoch != Long.MAX_VALUE) {
+                throw new RuntimeException("can't modify snapshot");
+            }
+            TimelineHashMap.this.clear();
+        }
+
+        public final Iterator<Map.Entry<K, V>> iterator() {
+            return new EntryIterator(epoch);
+        }
+
+        public final boolean contains(Object o) {
+            return snapshottableGet(o, epoch) != null;
+        }
+
+        public final boolean remove(Object o) {
+            if (epoch != Long.MAX_VALUE) {
+                throw new RuntimeException("can't modify snapshot");
+            }
+            return snapshottableRemove(o) != null;
+        }
+    }
+
+    final class EntryIterator implements Iterator<Map.Entry<K, V>> {
+        private final Iterator<TimelineHashMapEntry<K, V>> iter;
+
+        EntryIterator(long epoch) {
+            this.iter = snapshottableIterator(epoch);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public Map.Entry<K, V> next() {
+            return iter.next();
+        }
+
+        @Override
+        public void remove() {
+            iter.remove();
+        }
+    }
+
+    @Override
+    public Set<Entry<K, V>> entrySet() {
+        return entrySet(Long.MAX_VALUE);
+    }
+
+    public Set<Entry<K, V>> entrySet(long epoch) {
+        return new EntrySet(epoch);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 0;
+        Iterator<Entry<K, V>> iter = entrySet().iterator();
+        while (iter.hasNext()) {
+            hash += iter.next().hashCode();
+        }
+        return hash;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this)
+            return true;
+        if (!(o instanceof Map))
+            return false;
+        Map<?, ?> m = (Map<?, ?>) o;
+        if (m.size() != size())
+            return false;
+        try {
+            Iterator<Entry<K, V>> iter = entrySet().iterator();
+            while (iter.hasNext()) {
+                Entry<K, V> entry = iter.next();
+                if (!get(entry.getKey()).equals(entry.getValue())) {

Review comment:
       Should we use `m.get` here ?

##########
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *        Revertable       BaseHashTable
+ *              ↑              ↑
+ *           SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *               ↑             ↑
+ *   TimelineHashSet       TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable<T extends 
SnapshottableHashTable.ElementWithStartEpoch>
+        extends BaseHashTable<T> implements Revertable {
+    interface ElementWithStartEpoch {
+        void setStartEpoch(long startEpoch);
+        long startEpoch();
+    }
+
+    static class HashTier<T> {
+        private final int size;
+        private BaseHashTable<T> deltaTable;
+
+        HashTier(int size) {
+            this.size = size;
+        }
+    }
+
+    /**
+     * Iterate over the values that currently exist in the hash table.
+     *
+     * You can use this iterator even if you are making changes to the map.
+     * The changes may or may not be visible while you are iterating.
+     */
+    class CurrentIterator implements Iterator<T> {
+        private final Object[] topTier;
+        private final List<T> ready;
+        private int slot;
+        private T lastReturned;
+
+        CurrentIterator(Object[] topTier) {
+            this.topTier = topTier;
+            this.ready = new ArrayList<>();
+            this.slot = 0;
+            this.lastReturned = null;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (ready.isEmpty()) {
+                if (slot == topTier.length) {
+                    return false;
+                }
+                BaseHashTable.unpackSlot(ready, topTier, slot);
+                slot++;
+            }
+            return true;
+        }
+
+        @Override
+        public T next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            lastReturned = ready.remove(ready.size() - 1);
+            return lastReturned;
+        }
+
+        @Override
+        public void remove() {
+            if (lastReturned == null) {
+                throw new UnsupportedOperationException("remove");
+            }
+            snapshottableRemove(lastReturned);
+            lastReturned = null;
+        }
+    }
+
+    /**
+     * Iterate over the values that existed in the hash table during a 
specific snapshot.
+     *
+     * You can use this iterator even if you are making changes to the map.
+     * The snapshot is immutable and will always show up the same.
+     */
+    class HistoricalIterator implements Iterator<T> {
+        private final Object[] topTier;
+        private final Snapshot snapshot;
+        private final List<T> temp;
+        private final List<T> ready;
+        private int slot;
+
+        HistoricalIterator(Object[] topTier, Snapshot snapshot) {
+            this.topTier = topTier;
+            this.snapshot = snapshot;
+            this.temp = new ArrayList<>();
+            this.ready = new ArrayList<>();
+            this.slot = 0;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (ready.isEmpty()) {
+                if (slot == topTier.length) {
+                    return false;
+                }
+                BaseHashTable.unpackSlot(temp, topTier, slot);
+                for (T object : temp) {
+                    if (object.startEpoch() <= snapshot.epoch()) {
+                        ready.add(object);
+                    }
+                }
+                temp.clear();
+
+                /*
+                 * As we iterate over the SnapshottableHashTable, elements may 
move from
+                 * the top tier into the snapshot tiers.  This would happen if 
something
+                 * were deleted in the top tier, for example, but still 
retained in the
+                 * snapshot.
+                 *
+                 * We don't want to return any elements twice, though.  
Therefore, we
+                 * iterate over the top tier and the snapshot tier at the
+                 * same time.  The key to understanding how this works is 
realizing that
+                 * both hash tables use the same hash function, but simply 
choose a
+                 * different number of significant bits based on their size.
+                 * So if the top tier has size 4 and the snapshot tier has 
size 2, we have
+                 * the following mapping:
+                 *
+                 * Elements that would be in slot 0 or 1 in the top tier can 
only be in
+                 * slot 0 in the snapshot tier.
+                 * Elements that would be in slot 2 or 3 in the top tier can 
only be in
+                 * slot 1 in the snapshot tier.
+                 *
+                 * Therefore, we can do something like this:
+                 * 1. check slot 0 in the top tier and slot 0 in the snapshot 
tier.
+                 * 2. check slot 1 in the top tier and slot 0 in the snapshot 
tier.
+                 * 3. check slot 2 in the top tier and slot 1 in the snapshot 
tier.
+                 * 4. check slot 3 in the top tier and slot 1 in the snapshot 
tier.
+                 *
+                 * If elements move from the top tier to the snapshot tier, 
then
+                 * we'll still find them and report them exactly once.
+                 *
+                 * Note that while I used 4 and 2 as example sizes here, the 
same pattern
+                 * holds for different powers of two.  The "snapshot slot" of 
an element
+                 * will be the top few bits of the top tier slot of that 
element.
+                 */
+                HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+                if (tier != null && tier.deltaTable != null) {
+                    BaseHashTable<T> deltaTable = tier.deltaTable;
+                    int shift = 
Integer.numberOfLeadingZeros(deltaTable.baseElements().length) -
+                        Integer.numberOfLeadingZeros(topTier.length);

Review comment:
       Hmm, do we require the elements length in the top tier to be always >= 
that of the snapshot? If so, how do we enforce that?

##########
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshotRegistry.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+    private final Logger log;
+
+    /**
+     * The current epoch.  All snapshot epochs are lower than this number.
+     */
+    private long curEpoch;
+
+    /**
+     * An ArrayList of snapshots, kept in sorted order.
+     */
+    private final ArrayList<Snapshot> snapshots;
+
+    public SnapshotRegistry(long startEpoch) {
+        this(new LogContext(), startEpoch);
+    }
+
+    public SnapshotRegistry(LogContext logContext, long startEpoch) {
+        this.log = logContext.logger(SnapshotRegistry.class);
+        this.curEpoch = startEpoch;
+        this.snapshots = new ArrayList<>(5);
+    }
+
+    /**
+     * Returns an iterator that moves through snapshots from the lowest to the 
highest epoch.
+     */
+    public Iterator<Snapshot> snapshots() {
+        return snapshots.iterator();
+    }
+
+    /**
+     * Gets the snapshot for a specific epoch.
+     */
+    public Snapshot get(long epoch) {
+        for (Snapshot snapshot : snapshots) {
+            if (snapshot.epoch() == epoch) {
+                return snapshot;
+            }
+        }
+        throw new RuntimeException("No snapshot for epoch " + epoch);
+    }
+
+    /**
+     * Creates a new snapshot at the given epoch.
+     *
+     * @param epoch             The epoch to create the snapshot at.  The 
current epoch
+     *                          will be advanced to one past this epoch.
+     */
+    public Snapshot createSnapshot(long epoch) {
+        if (epoch < curEpoch) {
+            throw new RuntimeException("Can't create a new snapshot at epoch " 
+ epoch +
+                    " because the current epoch is " + curEpoch);
+        }
+        Snapshot snapshot = new Snapshot(epoch);
+        snapshots.add(snapshot);
+        curEpoch = epoch + 1;
+        log.debug("Creating snapshot {}", epoch);
+        return snapshot;
+    }
+
+    /**
+     * Deletes the snapshot with the given epoch.
+     *
+     * @param epoch             The epoch of the snapshot to delete.
+     */
+    public void deleteSnapshot(long epoch) {
+        Iterator<Snapshot> iter = snapshots.iterator();
+        while (iter.hasNext()) {
+            Snapshot snapshot = iter.next();
+            if (snapshot.epoch() == epoch) {
+                log.debug("Deleting snapshot {}", epoch);
+                iter.remove();
+                return;
+            }
+        }
+        throw new RuntimeException(String.format(
+            "No snapshot at epoch %d found. Snapshot epochs are %s.", epoch,
+                snapshots.stream().map(snapshot -> 
String.valueOf(snapshot.epoch())).
+                    collect(Collectors.joining(", "))));
+    }
+
+    /**
+     * Reverts the state of all data structures to the state at the given 
epoch.
+     *
+     * @param epoch             The epoch of the snapshot to revert to.
+     */
+    public void revertToSnapshot(long epoch) {
+        Snapshot target = null;
+        for (Iterator<Snapshot> iter = snapshots.iterator(); iter.hasNext(); ) 
{
+            Snapshot snapshot = iter.next();
+            if (target == null) {
+                if (snapshot.epoch() == epoch) {
+                    target = snapshot;
+                }
+            } else {
+                iter.remove();
+            }
+        }
+        log.info("Reverting to snapshot {}", epoch);
+        target.handleRevert();
+        curEpoch = epoch;
+    }
+
+    /**
+     * Returns the current epoch.
+     */
+    public long curEpoch() {
+        return curEpoch;
+    }
+
+    public void deleteSnapshotsUpTo(long offset) {

Review comment:
       Should offset be named sth like targetEpoch?

##########
File path: 
metadata/src/test/java/org/apache/kafka/timeline/timeline/SnapshottableHashTableTest.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+@Timeout(value = 40)
+public class SnapshottableHashTableTest {
+
+    /**
+     * The class of test elements.
+     *
+     * This class is intended to help test how the table handles distinct 
objects which
+     * are equal to each other.  Therefore, for the purpose of hashing and 
equality, we
+     * only check i here, and ignore j.
+     */
+    static class TestElement implements 
SnapshottableHashTable.ElementWithStartEpoch {
+        private final int i;
+        private final char j;
+        private long startEpoch = Long.MAX_VALUE;
+
+        TestElement(int i, char j) {
+            this.i = i;
+            this.j = j;
+        }
+
+        @Override
+        public void setStartEpoch(long startEpoch) {
+            this.startEpoch = startEpoch;
+        }
+
+        @Override
+        public long startEpoch() {
+            return startEpoch;
+        }
+
+        @Override
+        public int hashCode() {
+            return i;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof TestElement)) {
+                return false;
+            }
+            TestElement other = (TestElement) o;
+            return other.i == i;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("E_%d%c(%s)", i, j, 
System.identityHashCode(this));
+        }
+    }
+
+    private static final TestElement E_1A = new TestElement(1, 'A');
+    private static final TestElement E_1B = new TestElement(1, 'B');
+    private static final TestElement E_2A = new TestElement(2, 'A');
+    private static final TestElement E_3A = new TestElement(3, 'A');
+    private static final TestElement E_3B = new TestElement(3, 'B');
+
+    @Test
+    public void testEmptyTable() {
+        SnapshotRegistry registry = new SnapshotRegistry(0);
+        SnapshottableHashTable<TestElement> table =
+            new SnapshottableHashTable<>(registry, 1);
+        assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
+    }
+
+    @Test
+    public void testAddAndRemove() {
+        SnapshotRegistry registry = new SnapshotRegistry(0);
+        SnapshottableHashTable<TestElement> table =
+            new SnapshottableHashTable<>(registry, 1);
+        assertTrue(null == table.snapshottableAddOrReplace(E_1B));
+        assertEquals(1, table.snapshottableSize(Long.MAX_VALUE));
+        registry.createSnapshot(0);
+        assertTrue(E_1B == table.snapshottableAddOrReplace(E_1A));
+        assertTrue(E_1B == table.snapshottableGet(E_1A, 0));
+        assertTrue(E_1A == table.snapshottableGet(E_1A, Long.MAX_VALUE));
+        assertEquals(null, table.snapshottableAddOrReplace(E_2A));
+        assertEquals(null, table.snapshottableAddOrReplace(E_3A));
+        assertEquals(3, table.snapshottableSize(Long.MAX_VALUE));
+        assertEquals(1, table.snapshottableSize(0));
+        registry.createSnapshot(1);
+        assertEquals(E_1A, table.snapshottableRemove(E_1B));
+        assertEquals(E_2A, table.snapshottableRemove(E_2A));
+        assertEquals(E_3A, table.snapshottableRemove(E_3A));
+        assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
+        assertEquals(1, table.snapshottableSize(0));
+        assertEquals(3, table.snapshottableSize(1));
+        registry.deleteSnapshot(0);
+        assertEquals(assertThrows(RuntimeException.class, () ->
+                table.snapshottableSize(0)).getMessage(), "No snapshot for 
epoch 0");
+        registry.deleteSnapshot(1);
+        assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
+    }
+
+    @Test
+    public void testIterateOverSnapshot() {
+        SnapshotRegistry registry = new SnapshotRegistry(0);
+        SnapshottableHashTable<TestElement> table =
+            new SnapshottableHashTable<>(registry, 1);
+        assertTrue(table.snapshottableAddUnlessPresent(E_1B));
+        assertFalse(table.snapshottableAddUnlessPresent(E_1A));
+        assertTrue(table.snapshottableAddUnlessPresent(E_2A));
+        assertTrue(table.snapshottableAddUnlessPresent(E_3A));
+        registry.createSnapshot(0);
+        assertIteratorContains(table.snapshottableIterator(0), E_1B, E_2A, 
E_3A);
+        table.snapshottableRemove(E_1B);
+        assertIteratorContains(table.snapshottableIterator(0), E_1B, E_2A, 
E_3A);
+        table.snapshottableRemove(E_1A);
+        assertIteratorContains(table.snapshottableIterator(Long.MAX_VALUE), 
E_2A, E_3A);
+        table.snapshottableRemove(E_2A);
+        table.snapshottableRemove(E_3A);
+        assertIteratorContains(table.snapshottableIterator(0), E_1B, E_2A, 
E_3A);
+    }
+
+    @Test
+    public void testIterateOverSnapshotWhileExpandingTable() {
+        SnapshotRegistry registry = new SnapshotRegistry(0);
+        SnapshottableHashTable<TestElement> table =
+            new SnapshottableHashTable<>(registry, 1);
+        assertEquals(null, table.snapshottableAddOrReplace(E_1A));
+        registry.createSnapshot(0);
+        Iterator<TestElement> iter = table.snapshottableIterator(0);
+        assertTrue(table.snapshottableAddUnlessPresent(E_2A));
+        assertTrue(table.snapshottableAddUnlessPresent(E_3A));
+        assertIteratorContains(iter, E_1A);
+    }
+
+    @Test
+    public void testIterateOverSnapshotWhileDeletingAndReplacing() {
+        SnapshotRegistry registry = new SnapshotRegistry(0);
+        SnapshottableHashTable<TestElement> table =
+            new SnapshottableHashTable<>(registry, 1);
+        assertEquals(null, table.snapshottableAddOrReplace(E_1A));
+        assertEquals(null, table.snapshottableAddOrReplace(E_2A));
+        assertEquals(null, table.snapshottableAddOrReplace(E_3A));
+        assertEquals(E_1A, table.snapshottableRemove(E_1A));
+        assertEquals(null, table.snapshottableAddOrReplace(E_1B));
+        registry.createSnapshot(0);
+        Iterator<TestElement> iter = table.snapshottableIterator(0);
+        List<TestElement> iterElements = new ArrayList<>();
+        iterElements.add(iter.next());
+        assertEquals(E_2A, table.snapshottableRemove(E_2A));
+        assertEquals(E_3A, table.snapshottableAddOrReplace(E_3B));
+        iterElements.add(iter.next());
+        assertEquals(E_1B, table.snapshottableRemove(E_1B));
+        iterElements.add(iter.next());
+        assertFalse(iter.hasNext());
+        assertIteratorContains(iterElements.iterator(), E_1B, E_2A, E_3A);
+    }
+
+    @Test
+    public void testRevert() {
+        SnapshotRegistry registry = new SnapshotRegistry(0);
+        SnapshottableHashTable<TestElement> table =
+            new SnapshottableHashTable<>(registry, 1);
+        assertEquals(null, table.snapshottableAddOrReplace(E_1A));
+        assertEquals(null, table.snapshottableAddOrReplace(E_2A));
+        assertEquals(null, table.snapshottableAddOrReplace(E_3A));
+        registry.createSnapshot(0);
+        assertEquals(E_1A, table.snapshottableAddOrReplace(E_1B));
+        assertEquals(E_3A, table.snapshottableAddOrReplace(E_3B));
+        registry.createSnapshot(1);
+        assertEquals(3, table.snapshottableSize(Long.MAX_VALUE));
+        assertIteratorContains(table.snapshottableIterator(Long.MAX_VALUE), 
E_1B, E_2A, E_3B);
+        table.snapshottableRemove(E_1B);
+        table.snapshottableRemove(E_2A);
+        table.snapshottableRemove(E_3B);
+        assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
+        assertEquals(3, table.snapshottableSize(0));
+        assertEquals(3, table.snapshottableSize(1));
+        registry.revertToSnapshot(0);
+        assertIteratorContains(table.snapshottableIterator(Long.MAX_VALUE), 
E_1A, E_2A, E_3A);
+    }
+
+    /**
+     * Assert that the given iterator contains the given elements, in any 
order.
+     * We compare using reference equality here, rather than object equality.
+     */
+    private static void assertIteratorContains(Iterator<? extends Object> iter,

Review comment:
       This method tests exact set equal, not just containment. So, perhaps the 
method can be named more accurately.

##########
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *        Revertable       BaseHashTable
+ *              ↑              ↑
+ *           SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *               ↑             ↑
+ *   TimelineHashSet       TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable<T extends 
SnapshottableHashTable.ElementWithStartEpoch>
+        extends BaseHashTable<T> implements Revertable {
+    interface ElementWithStartEpoch {
+        void setStartEpoch(long startEpoch);
+        long startEpoch();
+    }
+
+    static class HashTier<T> {
+        private final int size;
+        private BaseHashTable<T> deltaTable;
+
+        HashTier(int size) {
+            this.size = size;
+        }
+    }
+
+    /**
+     * Iterate over the values that currently exist in the hash table.
+     *
+     * You can use this iterator even if you are making changes to the map.
+     * The changes may or may not be visible while you are iterating.
+     */
+    class CurrentIterator implements Iterator<T> {
+        private final Object[] topTier;
+        private final List<T> ready;
+        private int slot;
+        private T lastReturned;
+
+        CurrentIterator(Object[] topTier) {
+            this.topTier = topTier;
+            this.ready = new ArrayList<>();
+            this.slot = 0;
+            this.lastReturned = null;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (ready.isEmpty()) {
+                if (slot == topTier.length) {
+                    return false;
+                }
+                BaseHashTable.unpackSlot(ready, topTier, slot);
+                slot++;
+            }
+            return true;
+        }
+
+        @Override
+        public T next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            lastReturned = ready.remove(ready.size() - 1);
+            return lastReturned;
+        }
+
+        @Override
+        public void remove() {
+            if (lastReturned == null) {
+                throw new UnsupportedOperationException("remove");
+            }
+            snapshottableRemove(lastReturned);
+            lastReturned = null;
+        }
+    }
+
+    /**
+     * Iterate over the values that existed in the hash table during a 
specific snapshot.
+     *
+     * You can use this iterator even if you are making changes to the map.
+     * The snapshot is immutable and will always show up the same.
+     */
+    class HistoricalIterator implements Iterator<T> {
+        private final Object[] topTier;
+        private final Snapshot snapshot;
+        private final List<T> temp;
+        private final List<T> ready;
+        private int slot;
+
+        HistoricalIterator(Object[] topTier, Snapshot snapshot) {
+            this.topTier = topTier;
+            this.snapshot = snapshot;
+            this.temp = new ArrayList<>();
+            this.ready = new ArrayList<>();
+            this.slot = 0;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (ready.isEmpty()) {
+                if (slot == topTier.length) {
+                    return false;
+                }
+                BaseHashTable.unpackSlot(temp, topTier, slot);
+                for (T object : temp) {
+                    if (object.startEpoch() <= snapshot.epoch()) {
+                        ready.add(object);
+                    }
+                }
+                temp.clear();
+
+                /*
+                 * As we iterate over the SnapshottableHashTable, elements may 
move from
+                 * the top tier into the snapshot tiers.  This would happen if 
something
+                 * were deleted in the top tier, for example, but still 
retained in the
+                 * snapshot.
+                 *
+                 * We don't want to return any elements twice, though.  
Therefore, we
+                 * iterate over the top tier and the snapshot tier at the
+                 * same time.  The key to understanding how this works is 
realizing that
+                 * both hash tables use the same hash function, but simply 
choose a
+                 * different number of significant bits based on their size.
+                 * So if the top tier has size 4 and the snapshot tier has 
size 2, we have
+                 * the following mapping:
+                 *
+                 * Elements that would be in slot 0 or 1 in the top tier can 
only be in
+                 * slot 0 in the snapshot tier.
+                 * Elements that would be in slot 2 or 3 in the top tier can 
only be in
+                 * slot 1 in the snapshot tier.
+                 *
+                 * Therefore, we can do something like this:
+                 * 1. check slot 0 in the top tier and slot 0 in the snapshot 
tier.
+                 * 2. check slot 1 in the top tier and slot 0 in the snapshot 
tier.
+                 * 3. check slot 2 in the top tier and slot 1 in the snapshot 
tier.
+                 * 4. check slot 3 in the top tier and slot 1 in the snapshot 
tier.
+                 *
+                 * If elements move from the top tier to the snapshot tier, 
then
+                 * we'll still find them and report them exactly once.
+                 *
+                 * Note that while I used 4 and 2 as example sizes here, the 
same pattern
+                 * holds for different powers of two.  The "snapshot slot" of 
an element
+                 * will be the top few bits of the top tier slot of that 
element.
+                 */
+                HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+                if (tier != null && tier.deltaTable != null) {
+                    BaseHashTable<T> deltaTable = tier.deltaTable;
+                    int shift = 
Integer.numberOfLeadingZeros(deltaTable.baseElements().length) -
+                        Integer.numberOfLeadingZeros(topTier.length);
+                    int tierSlot = slot >>> shift;
+                    BaseHashTable.unpackSlot(temp, deltaTable.baseElements(), 
tierSlot);
+                    for (T object : temp) {
+                        if (BaseHashTable.findSlot(object, topTier.length) == 
slot) {
+                            ready.add(object);
+                        } else {
+                        }
+                    }
+                    temp.clear();
+                }
+                slot++;
+            }
+            return true;
+        }
+
+        @Override
+        public T next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return ready.remove(ready.size() - 1);
+        }
+    }
+
+    private final SnapshotRegistry snapshotRegistry;
+
+    SnapshottableHashTable(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+        super(expectedSize);
+        this.snapshotRegistry = snapshotRegistry;
+    }
+
+    int snapshottableSize(long epoch) {
+        if (epoch == Long.MAX_VALUE) {
+            return baseSize();
+        } else {
+            Snapshot snapshot = snapshotRegistry.get(epoch);
+            HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+            if (tier == null) {
+                return baseSize();
+            } else {
+                return tier.size;
+            }
+        }
+    }
+
+    T snapshottableGet(Object key, long epoch) {
+        T result = baseGet(key);
+        if (result != null && result.startEpoch() <= epoch) {
+            return result;
+        }
+        if (epoch == Long.MAX_VALUE) {
+            return null;
+        }
+        Snapshot snapshot = snapshotRegistry.get(epoch);
+        HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+        if (tier == null || tier.deltaTable == null) {
+            return null;
+        }
+        result = tier.deltaTable.baseGet(key);
+        return result;
+    }
+
+    boolean snapshottableAddUnlessPresent(T object) {
+        T prev = baseGet(object);
+        if (prev != null) {
+            return false;
+        }
+        object.setStartEpoch(snapshotRegistry.curEpoch());
+        int prevSize = baseSize();
+        baseAddOrReplace(object);
+        updateTierData(prevSize);
+        return true;
+    }
+
+    T snapshottableAddOrReplace(T object) {
+        object.setStartEpoch(snapshotRegistry.curEpoch());
+        int prevSize = baseSize();
+        T prev = baseAddOrReplace(object);
+        if (prev == null) {
+            updateTierData(prevSize);
+        } else {
+            updateTierData(prev, prevSize);
+        }
+        return prev;
+    }
+
+    Object snapshottableRemove(Object object) {
+        T prev = baseRemove(object);
+        if (prev == null) {
+            return null;
+        } else {
+            updateTierData(prev, baseSize() + 1);
+            return prev;
+        }
+    }
+
+    private void updateTierData(int prevSize) {
+        Iterator<Snapshot> iter = snapshotRegistry.snapshots();
+        while (iter.hasNext()) {
+            Snapshot snapshot = iter.next();
+            HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+            if (tier == null) {
+                tier = new HashTier<>(prevSize);
+                snapshot.setData(SnapshottableHashTable.this, tier);
+            }
+        }
+    }
+
+    private void updateTierData(T prev, int prevSize) {
+        Iterator<Snapshot> iter = snapshotRegistry.snapshots();
+        while (iter.hasNext()) {
+            Snapshot snapshot = iter.next();
+            HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+            if (tier == null) {
+                tier = new HashTier<>(prevSize);
+                snapshot.setData(SnapshottableHashTable.this, tier);
+            }
+            if (prev.startEpoch() <= snapshot.epoch()) {
+                if (tier.deltaTable == null) {
+                    tier.deltaTable = new BaseHashTable<>(1);
+                }
+                tier.deltaTable.baseAddOrReplace(prev);
+            }
+        }
+    }
+
+    Iterator<T> snapshottableIterator(long epoch) {
+        if (epoch == Long.MAX_VALUE) {
+            return new CurrentIterator(baseElements());
+        } else {
+            return new HistoricalIterator(baseElements(), 
snapshotRegistry.get(epoch));
+        }
+    }
+
+    String snapshottableToDebugString() {
+        StringBuilder bld = new StringBuilder();
+        bld.append(String.format("SnapshottableHashTable{%n"));
+        bld.append("top tier: ");
+        bld.append(baseToDebugString());
+        bld.append(String.format(",%nsnapshot tiers: [%n"));
+        String prefix = "";
+        for (Iterator<Snapshot> iter = snapshotRegistry.snapshots(); 
iter.hasNext(); ) {
+            Snapshot snapshot = iter.next();
+            bld.append(prefix);
+            bld.append("epoch ").append(snapshot.epoch()).append(": ");
+            HashTier<T> tier = snapshot.data(this);
+            if (tier == null) {
+                bld.append("null");
+            } else {
+                bld.append("HashTier{");
+                bld.append("size=").append(tier.size);
+                bld.append(", deltaTable=");
+                if (tier.deltaTable == null) {
+                    bld.append("null");
+                } else {
+                    bld.append(tier.deltaTable.baseToDebugString());
+                }
+                bld.append("}");
+            }
+            bld.append(String.format("%n"));
+        }
+        bld.append(String.format("]}%n"));
+        return bld.toString();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void executeRevert(long targetEpoch, Object data) {
+        HashTier<T> tier = (HashTier<T>) data;
+        Iterator<T> iter = snapshottableIterator(Long.MAX_VALUE);
+        while (iter.hasNext()) {
+            T element = iter.next();
+            if (element.startEpoch() > targetEpoch) {
+                iter.remove();
+            }
+        }
+        BaseHashTable<T> deltaTable = tier.deltaTable;
+        if (deltaTable != null) {
+            List<T> out = new ArrayList<>();
+            for (int i = 0; i < deltaTable.baseElements().length; i++) {
+                BaseHashTable.unpackSlot(out, deltaTable.baseElements(), i);
+                for (T value : out) {
+                    baseAddOrReplace(value);

Review comment:
       Hmm, why do we need to add the snapshot data to the top tier? It seems 
that it's ok to just leave it in the snapshot.

##########
File path: 
metadata/src/test/java/org/apache/kafka/timeline/timeline/SnapshottableHashTableTest.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+@Timeout(value = 40)
+public class SnapshottableHashTableTest {
+
+    /**
+     * The class of test elements.
+     *
+     * This class is intended to help test how the table handles distinct 
objects which
+     * are equal to each other.  Therefore, for the purpose of hashing and 
equality, we
+     * only check i here, and ignore j.
+     */
+    static class TestElement implements 
SnapshottableHashTable.ElementWithStartEpoch {
+        private final int i;
+        private final char j;
+        private long startEpoch = Long.MAX_VALUE;
+
+        TestElement(int i, char j) {
+            this.i = i;
+            this.j = j;
+        }
+
+        @Override
+        public void setStartEpoch(long startEpoch) {
+            this.startEpoch = startEpoch;
+        }
+
+        @Override
+        public long startEpoch() {
+            return startEpoch;
+        }
+
+        @Override
+        public int hashCode() {
+            return i;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof TestElement)) {
+                return false;
+            }
+            TestElement other = (TestElement) o;
+            return other.i == i;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("E_%d%c(%s)", i, j, 
System.identityHashCode(this));
+        }
+    }
+
+    private static final TestElement E_1A = new TestElement(1, 'A');
+    private static final TestElement E_1B = new TestElement(1, 'B');
+    private static final TestElement E_2A = new TestElement(2, 'A');
+    private static final TestElement E_3A = new TestElement(3, 'A');
+    private static final TestElement E_3B = new TestElement(3, 'B');
+
+    @Test
+    public void testEmptyTable() {
+        SnapshotRegistry registry = new SnapshotRegistry(0);
+        SnapshottableHashTable<TestElement> table =
+            new SnapshottableHashTable<>(registry, 1);
+        assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
+    }
+
+    @Test
+    public void testAddAndRemove() {
+        SnapshotRegistry registry = new SnapshotRegistry(0);
+        SnapshottableHashTable<TestElement> table =
+            new SnapshottableHashTable<>(registry, 1);
+        assertTrue(null == table.snapshottableAddOrReplace(E_1B));
+        assertEquals(1, table.snapshottableSize(Long.MAX_VALUE));
+        registry.createSnapshot(0);
+        assertTrue(E_1B == table.snapshottableAddOrReplace(E_1A));
+        assertTrue(E_1B == table.snapshottableGet(E_1A, 0));
+        assertTrue(E_1A == table.snapshottableGet(E_1A, Long.MAX_VALUE));
+        assertEquals(null, table.snapshottableAddOrReplace(E_2A));
+        assertEquals(null, table.snapshottableAddOrReplace(E_3A));
+        assertEquals(3, table.snapshottableSize(Long.MAX_VALUE));
+        assertEquals(1, table.snapshottableSize(0));
+        registry.createSnapshot(1);
+        assertEquals(E_1A, table.snapshottableRemove(E_1B));
+        assertEquals(E_2A, table.snapshottableRemove(E_2A));
+        assertEquals(E_3A, table.snapshottableRemove(E_3A));
+        assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
+        assertEquals(1, table.snapshottableSize(0));
+        assertEquals(3, table.snapshottableSize(1));
+        registry.deleteSnapshot(0);
+        assertEquals(assertThrows(RuntimeException.class, () ->
+                table.snapshottableSize(0)).getMessage(), "No snapshot for 
epoch 0");
+        registry.deleteSnapshot(1);
+        assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
+    }
+
+    @Test
+    public void testIterateOverSnapshot() {
+        SnapshotRegistry registry = new SnapshotRegistry(0);
+        SnapshottableHashTable<TestElement> table =
+            new SnapshottableHashTable<>(registry, 1);
+        assertTrue(table.snapshottableAddUnlessPresent(E_1B));
+        assertFalse(table.snapshottableAddUnlessPresent(E_1A));
+        assertTrue(table.snapshottableAddUnlessPresent(E_2A));
+        assertTrue(table.snapshottableAddUnlessPresent(E_3A));
+        registry.createSnapshot(0);
+        assertIteratorContains(table.snapshottableIterator(0), E_1B, E_2A, 
E_3A);
+        table.snapshottableRemove(E_1B);

Review comment:
       Should we assert the return value is not null?

##########
File path: 
metadata/src/test/java/org/apache/kafka/timeline/timeline/SnapshottableHashTableTest.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+@Timeout(value = 40)
+public class SnapshottableHashTableTest {
+
+    /**
+     * The class of test elements.
+     *
+     * This class is intended to help test how the table handles distinct 
objects which
+     * are equal to each other.  Therefore, for the purpose of hashing and 
equality, we
+     * only check i here, and ignore j.
+     */
+    static class TestElement implements 
SnapshottableHashTable.ElementWithStartEpoch {
+        private final int i;
+        private final char j;
+        private long startEpoch = Long.MAX_VALUE;
+
+        TestElement(int i, char j) {
+            this.i = i;
+            this.j = j;
+        }
+
+        @Override
+        public void setStartEpoch(long startEpoch) {
+            this.startEpoch = startEpoch;
+        }
+
+        @Override
+        public long startEpoch() {
+            return startEpoch;
+        }
+
+        @Override
+        public int hashCode() {
+            return i;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof TestElement)) {
+                return false;
+            }
+            TestElement other = (TestElement) o;
+            return other.i == i;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("E_%d%c(%s)", i, j, 
System.identityHashCode(this));
+        }
+    }
+
+    private static final TestElement E_1A = new TestElement(1, 'A');
+    private static final TestElement E_1B = new TestElement(1, 'B');
+    private static final TestElement E_2A = new TestElement(2, 'A');
+    private static final TestElement E_3A = new TestElement(3, 'A');
+    private static final TestElement E_3B = new TestElement(3, 'B');
+
+    @Test
+    public void testEmptyTable() {
+        SnapshotRegistry registry = new SnapshotRegistry(0);
+        SnapshottableHashTable<TestElement> table =
+            new SnapshottableHashTable<>(registry, 1);
+        assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
+    }
+
+    @Test
+    public void testAddAndRemove() {
+        SnapshotRegistry registry = new SnapshotRegistry(0);
+        SnapshottableHashTable<TestElement> table =
+            new SnapshottableHashTable<>(registry, 1);
+        assertTrue(null == table.snapshottableAddOrReplace(E_1B));
+        assertEquals(1, table.snapshottableSize(Long.MAX_VALUE));
+        registry.createSnapshot(0);
+        assertTrue(E_1B == table.snapshottableAddOrReplace(E_1A));
+        assertTrue(E_1B == table.snapshottableGet(E_1A, 0));
+        assertTrue(E_1A == table.snapshottableGet(E_1A, Long.MAX_VALUE));
+        assertEquals(null, table.snapshottableAddOrReplace(E_2A));
+        assertEquals(null, table.snapshottableAddOrReplace(E_3A));
+        assertEquals(3, table.snapshottableSize(Long.MAX_VALUE));
+        assertEquals(1, table.snapshottableSize(0));
+        registry.createSnapshot(1);
+        assertEquals(E_1A, table.snapshottableRemove(E_1B));
+        assertEquals(E_2A, table.snapshottableRemove(E_2A));
+        assertEquals(E_3A, table.snapshottableRemove(E_3A));
+        assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
+        assertEquals(1, table.snapshottableSize(0));
+        assertEquals(3, table.snapshottableSize(1));
+        registry.deleteSnapshot(0);
+        assertEquals(assertThrows(RuntimeException.class, () ->
+                table.snapshottableSize(0)).getMessage(), "No snapshot for 
epoch 0");
+        registry.deleteSnapshot(1);
+        assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
+    }
+
+    @Test
+    public void testIterateOverSnapshot() {
+        SnapshotRegistry registry = new SnapshotRegistry(0);
+        SnapshottableHashTable<TestElement> table =
+            new SnapshottableHashTable<>(registry, 1);
+        assertTrue(table.snapshottableAddUnlessPresent(E_1B));
+        assertFalse(table.snapshottableAddUnlessPresent(E_1A));
+        assertTrue(table.snapshottableAddUnlessPresent(E_2A));
+        assertTrue(table.snapshottableAddUnlessPresent(E_3A));
+        registry.createSnapshot(0);
+        assertIteratorContains(table.snapshottableIterator(0), E_1B, E_2A, 
E_3A);
+        table.snapshottableRemove(E_1B);
+        assertIteratorContains(table.snapshottableIterator(0), E_1B, E_2A, 
E_3A);
+        table.snapshottableRemove(E_1A);

Review comment:
       Should we assert the return value is null?

##########
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *        Revertable       BaseHashTable
+ *              ↑              ↑
+ *           SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *               ↑             ↑
+ *   TimelineHashSet       TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable<T extends 
SnapshottableHashTable.ElementWithStartEpoch>
+        extends BaseHashTable<T> implements Revertable {
+    interface ElementWithStartEpoch {
+        void setStartEpoch(long startEpoch);
+        long startEpoch();
+    }
+
+    static class HashTier<T> {
+        private final int size;
+        private BaseHashTable<T> deltaTable;
+
+        HashTier(int size) {
+            this.size = size;
+        }
+    }
+
+    /**
+     * Iterate over the values that currently exist in the hash table.
+     *
+     * You can use this iterator even if you are making changes to the map.
+     * The changes may or may not be visible while you are iterating.
+     */
+    class CurrentIterator implements Iterator<T> {
+        private final Object[] topTier;
+        private final List<T> ready;
+        private int slot;
+        private T lastReturned;
+
+        CurrentIterator(Object[] topTier) {
+            this.topTier = topTier;
+            this.ready = new ArrayList<>();
+            this.slot = 0;
+            this.lastReturned = null;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (ready.isEmpty()) {
+                if (slot == topTier.length) {
+                    return false;
+                }
+                BaseHashTable.unpackSlot(ready, topTier, slot);
+                slot++;
+            }
+            return true;
+        }
+
+        @Override
+        public T next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            lastReturned = ready.remove(ready.size() - 1);
+            return lastReturned;
+        }
+
+        @Override
+        public void remove() {
+            if (lastReturned == null) {
+                throw new UnsupportedOperationException("remove");
+            }
+            snapshottableRemove(lastReturned);
+            lastReturned = null;
+        }
+    }
+
+    /**
+     * Iterate over the values that existed in the hash table during a 
specific snapshot.
+     *
+     * You can use this iterator even if you are making changes to the map.
+     * The snapshot is immutable and will always show up the same.
+     */
+    class HistoricalIterator implements Iterator<T> {
+        private final Object[] topTier;
+        private final Snapshot snapshot;
+        private final List<T> temp;
+        private final List<T> ready;
+        private int slot;
+
+        HistoricalIterator(Object[] topTier, Snapshot snapshot) {
+            this.topTier = topTier;
+            this.snapshot = snapshot;
+            this.temp = new ArrayList<>();
+            this.ready = new ArrayList<>();
+            this.slot = 0;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (ready.isEmpty()) {
+                if (slot == topTier.length) {
+                    return false;
+                }
+                BaseHashTable.unpackSlot(temp, topTier, slot);
+                for (T object : temp) {
+                    if (object.startEpoch() <= snapshot.epoch()) {
+                        ready.add(object);
+                    }
+                }
+                temp.clear();
+
+                /*
+                 * As we iterate over the SnapshottableHashTable, elements may 
move from
+                 * the top tier into the snapshot tiers.  This would happen if 
something
+                 * were deleted in the top tier, for example, but still 
retained in the
+                 * snapshot.
+                 *
+                 * We don't want to return any elements twice, though.  
Therefore, we
+                 * iterate over the top tier and the snapshot tier at the
+                 * same time.  The key to understanding how this works is 
realizing that
+                 * both hash tables use the same hash function, but simply 
choose a
+                 * different number of significant bits based on their size.
+                 * So if the top tier has size 4 and the snapshot tier has 
size 2, we have
+                 * the following mapping:
+                 *
+                 * Elements that would be in slot 0 or 1 in the top tier can 
only be in
+                 * slot 0 in the snapshot tier.
+                 * Elements that would be in slot 2 or 3 in the top tier can 
only be in
+                 * slot 1 in the snapshot tier.
+                 *
+                 * Therefore, we can do something like this:
+                 * 1. check slot 0 in the top tier and slot 0 in the snapshot 
tier.
+                 * 2. check slot 1 in the top tier and slot 0 in the snapshot 
tier.
+                 * 3. check slot 2 in the top tier and slot 1 in the snapshot 
tier.
+                 * 4. check slot 3 in the top tier and slot 1 in the snapshot 
tier.
+                 *
+                 * If elements move from the top tier to the snapshot tier, 
then
+                 * we'll still find them and report them exactly once.
+                 *
+                 * Note that while I used 4 and 2 as example sizes here, the 
same pattern
+                 * holds for different powers of two.  The "snapshot slot" of 
an element
+                 * will be the top few bits of the top tier slot of that 
element.
+                 */
+                HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+                if (tier != null && tier.deltaTable != null) {
+                    BaseHashTable<T> deltaTable = tier.deltaTable;
+                    int shift = 
Integer.numberOfLeadingZeros(deltaTable.baseElements().length) -
+                        Integer.numberOfLeadingZeros(topTier.length);
+                    int tierSlot = slot >>> shift;
+                    BaseHashTable.unpackSlot(temp, deltaTable.baseElements(), 
tierSlot);
+                    for (T object : temp) {
+                        if (BaseHashTable.findSlot(object, topTier.length) == 
slot) {
+                            ready.add(object);
+                        } else {
+                        }
+                    }
+                    temp.clear();
+                }
+                slot++;
+            }
+            return true;
+        }
+
+        @Override
+        public T next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return ready.remove(ready.size() - 1);
+        }
+    }
+
+    private final SnapshotRegistry snapshotRegistry;
+
+    SnapshottableHashTable(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+        super(expectedSize);
+        this.snapshotRegistry = snapshotRegistry;
+    }
+
+    int snapshottableSize(long epoch) {
+        if (epoch == Long.MAX_VALUE) {
+            return baseSize();
+        } else {
+            Snapshot snapshot = snapshotRegistry.get(epoch);
+            HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+            if (tier == null) {
+                return baseSize();
+            } else {
+                return tier.size;
+            }
+        }
+    }
+
+    T snapshottableGet(Object key, long epoch) {
+        T result = baseGet(key);
+        if (result != null && result.startEpoch() <= epoch) {
+            return result;
+        }
+        if (epoch == Long.MAX_VALUE) {
+            return null;
+        }
+        Snapshot snapshot = snapshotRegistry.get(epoch);
+        HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+        if (tier == null || tier.deltaTable == null) {
+            return null;
+        }
+        result = tier.deltaTable.baseGet(key);
+        return result;
+    }
+
+    boolean snapshottableAddUnlessPresent(T object) {
+        T prev = baseGet(object);
+        if (prev != null) {
+            return false;
+        }
+        object.setStartEpoch(snapshotRegistry.curEpoch());
+        int prevSize = baseSize();
+        baseAddOrReplace(object);
+        updateTierData(prevSize);
+        return true;
+    }
+
+    T snapshottableAddOrReplace(T object) {
+        object.setStartEpoch(snapshotRegistry.curEpoch());
+        int prevSize = baseSize();
+        T prev = baseAddOrReplace(object);
+        if (prev == null) {
+            updateTierData(prevSize);
+        } else {
+            updateTierData(prev, prevSize);
+        }
+        return prev;
+    }
+
+    Object snapshottableRemove(Object object) {
+        T prev = baseRemove(object);
+        if (prev == null) {
+            return null;
+        } else {
+            updateTierData(prev, baseSize() + 1);
+            return prev;
+        }
+    }
+
+    private void updateTierData(int prevSize) {
+        Iterator<Snapshot> iter = snapshotRegistry.snapshots();
+        while (iter.hasNext()) {
+            Snapshot snapshot = iter.next();
+            HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+            if (tier == null) {
+                tier = new HashTier<>(prevSize);
+                snapshot.setData(SnapshottableHashTable.this, tier);
+            }
+        }
+    }
+
+    private void updateTierData(T prev, int prevSize) {
+        Iterator<Snapshot> iter = snapshotRegistry.snapshots();
+        while (iter.hasNext()) {
+            Snapshot snapshot = iter.next();
+            HashTier<T> tier = snapshot.data(SnapshottableHashTable.this);
+            if (tier == null) {
+                tier = new HashTier<>(prevSize);
+                snapshot.setData(SnapshottableHashTable.this, tier);
+            }
+            if (prev.startEpoch() <= snapshot.epoch()) {
+                if (tier.deltaTable == null) {
+                    tier.deltaTable = new BaseHashTable<>(1);
+                }
+                tier.deltaTable.baseAddOrReplace(prev);

Review comment:
       Exposing deltaTable seems to leak the encapsulation of HashTier a bit. 
Could we add a method like baseAddOrReplace() in HashTier which internally 
triggers the allocation of the BaseHashTable?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to