junrao commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r564872645
########## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ########## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + * Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte Review comment: typo thte ########## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ########## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + * Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte + * current state. + * + * One very important feature of SnapshottableHashTable is that we support iterating + * over a snapshot even while changes are being made to the current state. See the + * Javadoc for the iterator for more information about how this is accomplished. + * + * All of these classes require external synchronization, and don't support null keys or + * values. + */ +class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEpoch> + extends BaseHashTable<T> implements Revertable { + interface ElementWithStartEpoch { + void setStartEpoch(long startEpoch); + long startEpoch(); + } + + static class HashTier<T> { + private final int size; + private BaseHashTable<T> deltaTable; + + HashTier(int size) { + this.size = size; + } + } + + /** + * Iterate over the values that currently exist in the hash table. + * + * You can use this iterator even if you are making changes to the map. + * The changes may or may not be visible while you are iterating. + */ + class CurrentIterator implements Iterator<T> { + private final Object[] topTier; + private final List<T> ready; + private int slot; + private T lastReturned; + + CurrentIterator(Object[] topTier) { + this.topTier = topTier; + this.ready = new ArrayList<>(); + this.slot = 0; + this.lastReturned = null; + } + + @Override + public boolean hasNext() { + while (ready.isEmpty()) { + if (slot == topTier.length) { + return false; + } + BaseHashTable.unpackSlot(ready, topTier, slot); + slot++; + } + return true; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + lastReturned = ready.remove(ready.size() - 1); + return lastReturned; + } + + @Override + public void remove() { + if (lastReturned == null) { + throw new UnsupportedOperationException("remove"); + } + snapshottableRemove(lastReturned); + lastReturned = null; + } + } + + /** + * Iterate over the values that existed in the hash table during a specific snapshot. + * + * You can use this iterator even if you are making changes to the map. + * The snapshot is immutable and will always show up the same. + */ + class HistoricalIterator implements Iterator<T> { + private final Object[] topTier; + private final Snapshot snapshot; + private final List<T> temp; + private final List<T> ready; + private int slot; + + HistoricalIterator(Object[] topTier, Snapshot snapshot) { + this.topTier = topTier; + this.snapshot = snapshot; + this.temp = new ArrayList<>(); + this.ready = new ArrayList<>(); + this.slot = 0; + } + + @Override + public boolean hasNext() { + while (ready.isEmpty()) { + if (slot == topTier.length) { + return false; + } + BaseHashTable.unpackSlot(temp, topTier, slot); + for (T object : temp) { + if (object.startEpoch() <= snapshot.epoch()) { + ready.add(object); + } + } + temp.clear(); + + /* + * As we iterate over the SnapshottableHashTable, elements may move from + * the top tier into the snapshot tiers. This would happen if something + * were deleted in the top tier, for example, but still retained in the + * snapshot. + * + * We don't want to return any elements twice, though. Therefore, we + * iterate over the top tier and the snapshot tier at the + * same time. The key to understanding how this works is realizing that + * both hash tables use the same hash function, but simply choose a + * different number of significant bits based on their size. + * So if the top tier has size 4 and the snapshot tier has size 2, we have + * the following mapping: + * + * Elements that would be in slot 0 or 1 in the top tier can only be in + * slot 0 in the snapshot tier. + * Elements that would be in slot 2 or 3 in the top tier can only be in + * slot 1 in the snapshot tier. + * + * Therefore, we can do something like this: + * 1. check slot 0 in the top tier and slot 0 in the snapshot tier. + * 2. check slot 1 in the top tier and slot 0 in the snapshot tier. + * 3. check slot 2 in the top tier and slot 1 in the snapshot tier. + * 4. check slot 3 in the top tier and slot 1 in the snapshot tier. + * + * If elements move from the top tier to the snapshot tier, then + * we'll still find them and report them exactly once. + * + * Note that while I used 4 and 2 as example sizes here, the same pattern + * holds for different powers of two. The "snapshot slot" of an element + * will be the top few bits of the top tier slot of that element. + */ + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier != null && tier.deltaTable != null) { + BaseHashTable<T> deltaTable = tier.deltaTable; + int shift = Integer.numberOfLeadingZeros(deltaTable.baseElements().length) - + Integer.numberOfLeadingZeros(topTier.length); + int tierSlot = slot >>> shift; + BaseHashTable.unpackSlot(temp, deltaTable.baseElements(), tierSlot); + for (T object : temp) { + if (BaseHashTable.findSlot(object, topTier.length) == slot) { + ready.add(object); + } else { + } + } + temp.clear(); + } + slot++; + } + return true; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return ready.remove(ready.size() - 1); + } + } + + private final SnapshotRegistry snapshotRegistry; + + SnapshottableHashTable(SnapshotRegistry snapshotRegistry, int expectedSize) { + super(expectedSize); + this.snapshotRegistry = snapshotRegistry; + } + + int snapshottableSize(long epoch) { + if (epoch == Long.MAX_VALUE) { + return baseSize(); + } else { + Snapshot snapshot = snapshotRegistry.get(epoch); + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier == null) { + return baseSize(); + } else { + return tier.size; + } + } + } + + T snapshottableGet(Object key, long epoch) { + T result = baseGet(key); + if (result != null && result.startEpoch() <= epoch) { + return result; + } + if (epoch == Long.MAX_VALUE) { Review comment: Instead of directly using Long.MAX_VALUE, perhaps we could use a special constant like NO_EPOCH or sth like that to make it clear? ########## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshotRegistry.java ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.stream.Collectors; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +/** + * A registry containing snapshots of timeline data structures. + * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time. + * Therefore, we use ArrayLists here rather than a data structure with higher overhead. + */ +public class SnapshotRegistry { + private final Logger log; + + /** + * The current epoch. All snapshot epochs are lower than this number. + */ + private long curEpoch; + + /** + * An ArrayList of snapshots, kept in sorted order. + */ + private final ArrayList<Snapshot> snapshots; + + public SnapshotRegistry(long startEpoch) { + this(new LogContext(), startEpoch); + } + + public SnapshotRegistry(LogContext logContext, long startEpoch) { + this.log = logContext.logger(SnapshotRegistry.class); + this.curEpoch = startEpoch; + this.snapshots = new ArrayList<>(5); + } + + /** + * Returns an iterator that moves through snapshots from the lowest to the highest epoch. + */ + public Iterator<Snapshot> snapshots() { + return snapshots.iterator(); + } + + /** + * Gets the snapshot for a specific epoch. + */ + public Snapshot get(long epoch) { + for (Snapshot snapshot : snapshots) { + if (snapshot.epoch() == epoch) { + return snapshot; + } + } + throw new RuntimeException("No snapshot for epoch " + epoch); + } + + /** + * Creates a new snapshot at the given epoch. + * + * @param epoch The epoch to create the snapshot at. The current epoch + * will be advanced to one past this epoch. + */ + public Snapshot createSnapshot(long epoch) { + if (epoch < curEpoch) { + throw new RuntimeException("Can't create a new snapshot at epoch " + epoch + + " because the current epoch is " + curEpoch); + } + Snapshot snapshot = new Snapshot(epoch); + snapshots.add(snapshot); + curEpoch = epoch + 1; + log.debug("Creating snapshot {}", epoch); + return snapshot; + } + + /** + * Deletes the snapshot with the given epoch. + * + * @param epoch The epoch of the snapshot to delete. + */ + public void deleteSnapshot(long epoch) { + Iterator<Snapshot> iter = snapshots.iterator(); + while (iter.hasNext()) { + Snapshot snapshot = iter.next(); + if (snapshot.epoch() == epoch) { + log.debug("Deleting snapshot {}", epoch); + iter.remove(); + return; + } + } + throw new RuntimeException(String.format( + "No snapshot at epoch %d found. Snapshot epochs are %s.", epoch, + snapshots.stream().map(snapshot -> String.valueOf(snapshot.epoch())). + collect(Collectors.joining(", ")))); + } + + /** + * Reverts the state of all data structures to the state at the given epoch. + * + * @param epoch The epoch of the snapshot to revert to. + */ + public void revertToSnapshot(long epoch) { Review comment: Should this method also remove all snapshots with epoch > the input epoch? ########## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashSet.java ########## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; + +/** + * This is a hash set which can be snapshotted. + * + * See {@SnapshottableHashTable} for more details about the implementation. + * + * This class requires external synchronization. Null values are not supported. + * + * @param <T> The value type of the set. + */ +public class TimelineHashSet<T> + extends SnapshottableHashTable<TimelineHashSet.TimelineHashSetEntry<T>> + implements Set<T> { + static class TimelineHashSetEntry<T> + implements SnapshottableHashTable.ElementWithStartEpoch { + private final T value; + private long startEpoch; + + TimelineHashSetEntry(T value) { + this.value = value; + this.startEpoch = Long.MAX_VALUE; + } + + public T getValue() { + return value; + } + + @Override + public void setStartEpoch(long startEpoch) { + this.startEpoch = startEpoch; + } + + @Override + public long startEpoch() { + return startEpoch; + } + + @SuppressWarnings("unchecked") + @Override + public boolean equals(Object o) { + if (!(o instanceof TimelineHashSetEntry)) return false; + TimelineHashSetEntry<T> other = (TimelineHashSetEntry<T>) o; + return value.equals(other.value); + } + + @Override + public int hashCode() { + return value.hashCode(); + } + } + + public TimelineHashSet(SnapshotRegistry snapshotRegistry, int expectedSize) { + super(snapshotRegistry, expectedSize); + } + + @Override + public int size() { + return size(Long.MAX_VALUE); + } + + public int size(long epoch) { + return snapshottableSize(epoch); + } + + @Override + public boolean isEmpty() { + return isEmpty(Long.MAX_VALUE); + } + + public boolean isEmpty(long epoch) { + return snapshottableSize(epoch) == 0; + } + + @Override + public boolean contains(Object key) { + return contains(key, Long.MAX_VALUE); + } + + public boolean contains(Object object, long epoch) { + return snapshottableGet(new TimelineHashSetEntry<>(object), epoch) != null; + } + + final class ValueIterator implements Iterator<T> { + private final Iterator<TimelineHashSetEntry<T>> iter; + + ValueIterator(long epoch) { + this.iter = snapshottableIterator(epoch); + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public T next() { + return iter.next().value; + } + + @Override + public void remove() { + iter.remove(); + } + } + + @Override + public Iterator<T> iterator() { + return iterator(Long.MAX_VALUE); + } + + public Iterator<T> iterator(long epoch) { + return new ValueIterator(epoch); + } + + @Override + public Object[] toArray() { + Object[] result = new Object[size()]; + Iterator<T> iter = iterator(); + int i = 0; + while (iter.hasNext()) { + result[i++] = iter.next(); + } + return result; + } + + @SuppressWarnings("unchecked") + @Override + public <R> R[] toArray(R[] a) { + int size = size(); + if (size <= a.length) { + Iterator<T> iter = iterator(); + int i = 0; + while (iter.hasNext()) { + a[i++] = (R) iter.next(); + } + while (i < a.length) { + a[i++] = null; + } + return a; + } else { + return (R[]) toArray(); + } + } + + @Override + public boolean add(T newValue) { + Objects.requireNonNull(newValue); + return snapshottableAddUnlessPresent(new TimelineHashSetEntry<>(newValue)); + } + + @Override + public boolean remove(Object value) { + return snapshottableRemove(new TimelineHashSetEntry<>(value)) != null; + } + + @Override + public boolean containsAll(Collection<?> collection) { + for (Object value : collection) { + if (!contains(value)) return false; + } + return true; + } + + @Override + public boolean addAll(Collection<? extends T> collection) { + boolean modified = false; + for (T value : collection) { + if (add(value)) { + modified = true; + } + } + return modified; + } + + @Override + public boolean retainAll(Collection<?> collection) { + Objects.requireNonNull(collection); + boolean modified = false; + Iterator<T> it = iterator(); + while (it.hasNext()) { + if (!collection.contains(it.next())) { + it.remove(); + modified = true; + } + } + return modified; + } + + @Override + public boolean removeAll(Collection<?> collection) { + Objects.requireNonNull(collection); + boolean modified = false; + Iterator<?> it = iterator(); + while (it.hasNext()) { + if (collection.contains(it.next())) { + it.remove(); + modified = true; + } + } + return modified; + } + + @Override + public void clear() { Review comment: Should we clear all snapshots in this set too? ########## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ########## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + * Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte + * current state. + * + * One very important feature of SnapshottableHashTable is that we support iterating + * over a snapshot even while changes are being made to the current state. See the + * Javadoc for the iterator for more information about how this is accomplished. + * + * All of these classes require external synchronization, and don't support null keys or + * values. + */ +class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEpoch> + extends BaseHashTable<T> implements Revertable { + interface ElementWithStartEpoch { + void setStartEpoch(long startEpoch); + long startEpoch(); + } + + static class HashTier<T> { + private final int size; + private BaseHashTable<T> deltaTable; + + HashTier(int size) { + this.size = size; + } + } + + /** + * Iterate over the values that currently exist in the hash table. + * + * You can use this iterator even if you are making changes to the map. + * The changes may or may not be visible while you are iterating. + */ + class CurrentIterator implements Iterator<T> { + private final Object[] topTier; + private final List<T> ready; + private int slot; + private T lastReturned; + + CurrentIterator(Object[] topTier) { + this.topTier = topTier; + this.ready = new ArrayList<>(); + this.slot = 0; + this.lastReturned = null; + } + + @Override + public boolean hasNext() { + while (ready.isEmpty()) { + if (slot == topTier.length) { + return false; + } + BaseHashTable.unpackSlot(ready, topTier, slot); + slot++; + } + return true; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + lastReturned = ready.remove(ready.size() - 1); + return lastReturned; + } + + @Override + public void remove() { + if (lastReturned == null) { + throw new UnsupportedOperationException("remove"); + } + snapshottableRemove(lastReturned); + lastReturned = null; + } + } + + /** + * Iterate over the values that existed in the hash table during a specific snapshot. + * + * You can use this iterator even if you are making changes to the map. + * The snapshot is immutable and will always show up the same. + */ + class HistoricalIterator implements Iterator<T> { + private final Object[] topTier; + private final Snapshot snapshot; + private final List<T> temp; + private final List<T> ready; + private int slot; + + HistoricalIterator(Object[] topTier, Snapshot snapshot) { + this.topTier = topTier; + this.snapshot = snapshot; + this.temp = new ArrayList<>(); + this.ready = new ArrayList<>(); + this.slot = 0; + } + + @Override + public boolean hasNext() { + while (ready.isEmpty()) { + if (slot == topTier.length) { + return false; + } + BaseHashTable.unpackSlot(temp, topTier, slot); + for (T object : temp) { + if (object.startEpoch() <= snapshot.epoch()) { + ready.add(object); + } + } + temp.clear(); + + /* + * As we iterate over the SnapshottableHashTable, elements may move from + * the top tier into the snapshot tiers. This would happen if something + * were deleted in the top tier, for example, but still retained in the + * snapshot. + * + * We don't want to return any elements twice, though. Therefore, we + * iterate over the top tier and the snapshot tier at the + * same time. The key to understanding how this works is realizing that + * both hash tables use the same hash function, but simply choose a + * different number of significant bits based on their size. + * So if the top tier has size 4 and the snapshot tier has size 2, we have + * the following mapping: + * + * Elements that would be in slot 0 or 1 in the top tier can only be in + * slot 0 in the snapshot tier. + * Elements that would be in slot 2 or 3 in the top tier can only be in + * slot 1 in the snapshot tier. + * + * Therefore, we can do something like this: + * 1. check slot 0 in the top tier and slot 0 in the snapshot tier. + * 2. check slot 1 in the top tier and slot 0 in the snapshot tier. + * 3. check slot 2 in the top tier and slot 1 in the snapshot tier. + * 4. check slot 3 in the top tier and slot 1 in the snapshot tier. + * + * If elements move from the top tier to the snapshot tier, then + * we'll still find them and report them exactly once. + * + * Note that while I used 4 and 2 as example sizes here, the same pattern + * holds for different powers of two. The "snapshot slot" of an element + * will be the top few bits of the top tier slot of that element. + */ + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier != null && tier.deltaTable != null) { + BaseHashTable<T> deltaTable = tier.deltaTable; + int shift = Integer.numberOfLeadingZeros(deltaTable.baseElements().length) - + Integer.numberOfLeadingZeros(topTier.length); + int tierSlot = slot >>> shift; + BaseHashTable.unpackSlot(temp, deltaTable.baseElements(), tierSlot); + for (T object : temp) { + if (BaseHashTable.findSlot(object, topTier.length) == slot) { + ready.add(object); + } else { + } + } + temp.clear(); + } + slot++; + } + return true; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return ready.remove(ready.size() - 1); + } + } + + private final SnapshotRegistry snapshotRegistry; + + SnapshottableHashTable(SnapshotRegistry snapshotRegistry, int expectedSize) { + super(expectedSize); + this.snapshotRegistry = snapshotRegistry; + } + + int snapshottableSize(long epoch) { + if (epoch == Long.MAX_VALUE) { + return baseSize(); + } else { + Snapshot snapshot = snapshotRegistry.get(epoch); + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier == null) { + return baseSize(); + } else { + return tier.size; + } + } + } + + T snapshottableGet(Object key, long epoch) { + T result = baseGet(key); + if (result != null && result.startEpoch() <= epoch) { + return result; + } + if (epoch == Long.MAX_VALUE) { + return null; + } + Snapshot snapshot = snapshotRegistry.get(epoch); + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier == null || tier.deltaTable == null) { + return null; + } + result = tier.deltaTable.baseGet(key); + return result; + } + + boolean snapshottableAddUnlessPresent(T object) { + T prev = baseGet(object); + if (prev != null) { + return false; + } + object.setStartEpoch(snapshotRegistry.curEpoch()); + int prevSize = baseSize(); + baseAddOrReplace(object); + updateTierData(prevSize); + return true; + } + + T snapshottableAddOrReplace(T object) { + object.setStartEpoch(snapshotRegistry.curEpoch()); + int prevSize = baseSize(); + T prev = baseAddOrReplace(object); + if (prev == null) { + updateTierData(prevSize); + } else { + updateTierData(prev, prevSize); + } + return prev; + } + + Object snapshottableRemove(Object object) { + T prev = baseRemove(object); + if (prev == null) { + return null; + } else { + updateTierData(prev, baseSize() + 1); + return prev; + } + } + + private void updateTierData(int prevSize) { + Iterator<Snapshot> iter = snapshotRegistry.snapshots(); + while (iter.hasNext()) { + Snapshot snapshot = iter.next(); + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier == null) { + tier = new HashTier<>(prevSize); + snapshot.setData(SnapshottableHashTable.this, tier); Review comment: Do we need to create a new tier here or could we delay the allocation until it's truly needed (which already exists in `updateTierData(T prev, int prevSize)`)? ########## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/BaseHashTable.java ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; Review comment: The file path doesn't quite match the package name since it has 2 timeline in it. Other files have the same issue. ########## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ########## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + * Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte + * current state. + * + * One very important feature of SnapshottableHashTable is that we support iterating + * over a snapshot even while changes are being made to the current state. See the + * Javadoc for the iterator for more information about how this is accomplished. + * + * All of these classes require external synchronization, and don't support null keys or + * values. + */ +class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEpoch> + extends BaseHashTable<T> implements Revertable { + interface ElementWithStartEpoch { + void setStartEpoch(long startEpoch); + long startEpoch(); + } + + static class HashTier<T> { + private final int size; + private BaseHashTable<T> deltaTable; + + HashTier(int size) { + this.size = size; + } + } + + /** + * Iterate over the values that currently exist in the hash table. + * + * You can use this iterator even if you are making changes to the map. + * The changes may or may not be visible while you are iterating. + */ + class CurrentIterator implements Iterator<T> { + private final Object[] topTier; + private final List<T> ready; + private int slot; + private T lastReturned; + + CurrentIterator(Object[] topTier) { + this.topTier = topTier; + this.ready = new ArrayList<>(); + this.slot = 0; + this.lastReturned = null; + } + + @Override + public boolean hasNext() { + while (ready.isEmpty()) { + if (slot == topTier.length) { + return false; + } + BaseHashTable.unpackSlot(ready, topTier, slot); + slot++; + } + return true; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + lastReturned = ready.remove(ready.size() - 1); + return lastReturned; + } + + @Override + public void remove() { + if (lastReturned == null) { + throw new UnsupportedOperationException("remove"); + } + snapshottableRemove(lastReturned); + lastReturned = null; + } + } + + /** + * Iterate over the values that existed in the hash table during a specific snapshot. + * + * You can use this iterator even if you are making changes to the map. + * The snapshot is immutable and will always show up the same. + */ + class HistoricalIterator implements Iterator<T> { + private final Object[] topTier; + private final Snapshot snapshot; + private final List<T> temp; + private final List<T> ready; + private int slot; + + HistoricalIterator(Object[] topTier, Snapshot snapshot) { + this.topTier = topTier; + this.snapshot = snapshot; + this.temp = new ArrayList<>(); + this.ready = new ArrayList<>(); + this.slot = 0; + } + + @Override + public boolean hasNext() { + while (ready.isEmpty()) { + if (slot == topTier.length) { + return false; + } + BaseHashTable.unpackSlot(temp, topTier, slot); + for (T object : temp) { + if (object.startEpoch() <= snapshot.epoch()) { + ready.add(object); + } + } + temp.clear(); + + /* + * As we iterate over the SnapshottableHashTable, elements may move from + * the top tier into the snapshot tiers. This would happen if something + * were deleted in the top tier, for example, but still retained in the + * snapshot. + * + * We don't want to return any elements twice, though. Therefore, we + * iterate over the top tier and the snapshot tier at the + * same time. The key to understanding how this works is realizing that + * both hash tables use the same hash function, but simply choose a + * different number of significant bits based on their size. + * So if the top tier has size 4 and the snapshot tier has size 2, we have + * the following mapping: + * + * Elements that would be in slot 0 or 1 in the top tier can only be in + * slot 0 in the snapshot tier. + * Elements that would be in slot 2 or 3 in the top tier can only be in + * slot 1 in the snapshot tier. + * + * Therefore, we can do something like this: + * 1. check slot 0 in the top tier and slot 0 in the snapshot tier. + * 2. check slot 1 in the top tier and slot 0 in the snapshot tier. + * 3. check slot 2 in the top tier and slot 1 in the snapshot tier. + * 4. check slot 3 in the top tier and slot 1 in the snapshot tier. + * + * If elements move from the top tier to the snapshot tier, then + * we'll still find them and report them exactly once. + * + * Note that while I used 4 and 2 as example sizes here, the same pattern + * holds for different powers of two. The "snapshot slot" of an element + * will be the top few bits of the top tier slot of that element. + */ + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier != null && tier.deltaTable != null) { + BaseHashTable<T> deltaTable = tier.deltaTable; + int shift = Integer.numberOfLeadingZeros(deltaTable.baseElements().length) - + Integer.numberOfLeadingZeros(topTier.length); + int tierSlot = slot >>> shift; + BaseHashTable.unpackSlot(temp, deltaTable.baseElements(), tierSlot); + for (T object : temp) { + if (BaseHashTable.findSlot(object, topTier.length) == slot) { + ready.add(object); + } else { + } + } + temp.clear(); + } + slot++; + } + return true; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return ready.remove(ready.size() - 1); + } + } + + private final SnapshotRegistry snapshotRegistry; + + SnapshottableHashTable(SnapshotRegistry snapshotRegistry, int expectedSize) { + super(expectedSize); + this.snapshotRegistry = snapshotRegistry; + } + + int snapshottableSize(long epoch) { + if (epoch == Long.MAX_VALUE) { + return baseSize(); + } else { + Snapshot snapshot = snapshotRegistry.get(epoch); + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier == null) { + return baseSize(); + } else { + return tier.size; + } + } + } + + T snapshottableGet(Object key, long epoch) { + T result = baseGet(key); + if (result != null && result.startEpoch() <= epoch) { + return result; + } + if (epoch == Long.MAX_VALUE) { + return null; + } + Snapshot snapshot = snapshotRegistry.get(epoch); + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier == null || tier.deltaTable == null) { + return null; + } + result = tier.deltaTable.baseGet(key); + return result; + } + + boolean snapshottableAddUnlessPresent(T object) { + T prev = baseGet(object); + if (prev != null) { + return false; + } + object.setStartEpoch(snapshotRegistry.curEpoch()); + int prevSize = baseSize(); + baseAddOrReplace(object); + updateTierData(prevSize); Review comment: `baseAddOrReplace()` already calls `updateTierData()`. Do we need to call it again here? ########## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.AbstractCollection; +import java.util.AbstractSet; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * This is a hash map which can be snapshotted. + * + * See {@SnapshottableHashTable} for more details about the implementation. + * + * This class requires external synchronization. Null keys and values are not supported. + * + * @param <K> The key type of the set. + * @param <V> The value type of the set. + */ +public class TimelineHashMap<K, V> + extends SnapshottableHashTable<TimelineHashMap.TimelineHashMapEntry<K, V>> + implements Map<K, V> { + static class TimelineHashMapEntry<K, V> + implements SnapshottableHashTable.ElementWithStartEpoch, Map.Entry<K, V> { + private final K key; + private final V value; + private long startEpoch; + + TimelineHashMapEntry(K key, V value) { + this.key = key; + this.value = value; + this.startEpoch = Long.MAX_VALUE; + } + + @Override + public K getKey() { + return key; + } + + @Override + public V getValue() { + return value; + } + + @Override + public V setValue(V value) { + // This would be inefficient to support since we'd need a back-reference + // to the enclosing map in each Entry object. There would also be + // complications if this entry object was sourced from a historical iterator; + // we don't support modifying the past. Since we don't really need this API, + // let's just not support it. + throw new UnsupportedOperationException(); + } + + @Override + public void setStartEpoch(long startEpoch) { + this.startEpoch = startEpoch; + } + + @Override + public long startEpoch() { + return startEpoch; + } + + @SuppressWarnings("unchecked") + @Override + public boolean equals(Object o) { + if (!(o instanceof TimelineHashMapEntry)) return false; + TimelineHashMapEntry<K, V> other = (TimelineHashMapEntry<K, V>) o; + return key.equals(other.key); + } + + @Override + public int hashCode() { + return key.hashCode(); + } + } + + public TimelineHashMap(SnapshotRegistry snapshotRegistry, int expectedSize) { + super(snapshotRegistry, expectedSize); + } + + @Override + public int size() { + return size(Long.MAX_VALUE); + } + + public int size(long epoch) { + return snapshottableSize(epoch); + } + + @Override + public boolean isEmpty() { + return isEmpty(Long.MAX_VALUE); + } + + public boolean isEmpty(long epoch) { + return snapshottableSize(epoch) == 0; + } + + @Override + public boolean containsKey(Object key) { + return containsKey(key, Long.MAX_VALUE); + } + + public boolean containsKey(Object key, long epoch) { + return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) != null; + } + + @Override + public boolean containsValue(Object value) { + Iterator<Entry<K, V>> iter = entrySet().iterator(); + while (iter.hasNext()) { + Entry<K, V> e = iter.next(); + if (value.equals(e.getValue())) { + return true; + } + } + return false; + } + + @Override + public V get(Object key) { + return get(key, Long.MAX_VALUE); + } + + public V get(Object key, long epoch) { + Entry<K, V> entry = + snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch); + if (entry == null) { + return null; + } + return entry.getValue(); + } + + @Override + public V put(K key, V value) { + Objects.requireNonNull(key); + Objects.requireNonNull(value); + TimelineHashMapEntry<K, V> entry = new TimelineHashMapEntry<>(key, value); + TimelineHashMapEntry<K, V> prev = snapshottableAddOrReplace(entry); + if (prev == null) { + return null; + } + return prev.getValue(); + } + + @Override + @SuppressWarnings("unchecked") + public V remove(Object key) { + return (V) snapshottableRemove(new TimelineHashMapEntry<>(key, null)); + } + + @Override + public void putAll(Map<? extends K, ? extends V> map) { + for (Map.Entry<? extends K, ? extends V> e : map.entrySet()) { + put(e.getKey(), e.getValue()); + } + } + + @Override + public void clear() { Review comment: Should we clear all snapshots in this map too? ########## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshotRegistry.java ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.stream.Collectors; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +/** + * A registry containing snapshots of timeline data structures. + * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time. + * Therefore, we use ArrayLists here rather than a data structure with higher overhead. + */ +public class SnapshotRegistry { + private final Logger log; + + /** + * The current epoch. All snapshot epochs are lower than this number. + */ + private long curEpoch; + + /** + * An ArrayList of snapshots, kept in sorted order. + */ + private final ArrayList<Snapshot> snapshots; + + public SnapshotRegistry(long startEpoch) { + this(new LogContext(), startEpoch); + } + + public SnapshotRegistry(LogContext logContext, long startEpoch) { + this.log = logContext.logger(SnapshotRegistry.class); + this.curEpoch = startEpoch; + this.snapshots = new ArrayList<>(5); + } + + /** + * Returns an iterator that moves through snapshots from the lowest to the highest epoch. + */ + public Iterator<Snapshot> snapshots() { + return snapshots.iterator(); + } + + /** + * Gets the snapshot for a specific epoch. + */ + public Snapshot get(long epoch) { + for (Snapshot snapshot : snapshots) { + if (snapshot.epoch() == epoch) { + return snapshot; + } + } + throw new RuntimeException("No snapshot for epoch " + epoch); + } + + /** + * Creates a new snapshot at the given epoch. + * + * @param epoch The epoch to create the snapshot at. The current epoch + * will be advanced to one past this epoch. + */ + public Snapshot createSnapshot(long epoch) { Review comment: It seems that we want the caller to provide a monotonically increasing epoch. Another model is not to have the caller provide an epoch and have each createSnapshot() create a new epoch and return it to the caller. Will that model work for our use case and be simpler? ########## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.AbstractCollection; +import java.util.AbstractSet; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * This is a hash map which can be snapshotted. + * + * See {@SnapshottableHashTable} for more details about the implementation. + * + * This class requires external synchronization. Null keys and values are not supported. + * + * @param <K> The key type of the set. + * @param <V> The value type of the set. + */ +public class TimelineHashMap<K, V> + extends SnapshottableHashTable<TimelineHashMap.TimelineHashMapEntry<K, V>> + implements Map<K, V> { + static class TimelineHashMapEntry<K, V> + implements SnapshottableHashTable.ElementWithStartEpoch, Map.Entry<K, V> { + private final K key; + private final V value; + private long startEpoch; + + TimelineHashMapEntry(K key, V value) { + this.key = key; + this.value = value; + this.startEpoch = Long.MAX_VALUE; + } + + @Override + public K getKey() { + return key; + } + + @Override + public V getValue() { + return value; + } + + @Override + public V setValue(V value) { + // This would be inefficient to support since we'd need a back-reference + // to the enclosing map in each Entry object. There would also be + // complications if this entry object was sourced from a historical iterator; + // we don't support modifying the past. Since we don't really need this API, + // let's just not support it. + throw new UnsupportedOperationException(); + } + + @Override + public void setStartEpoch(long startEpoch) { + this.startEpoch = startEpoch; + } + + @Override + public long startEpoch() { + return startEpoch; + } + + @SuppressWarnings("unchecked") + @Override + public boolean equals(Object o) { + if (!(o instanceof TimelineHashMapEntry)) return false; + TimelineHashMapEntry<K, V> other = (TimelineHashMapEntry<K, V>) o; + return key.equals(other.key); + } + + @Override + public int hashCode() { + return key.hashCode(); + } + } + + public TimelineHashMap(SnapshotRegistry snapshotRegistry, int expectedSize) { + super(snapshotRegistry, expectedSize); + } + + @Override + public int size() { + return size(Long.MAX_VALUE); + } + + public int size(long epoch) { + return snapshottableSize(epoch); + } + + @Override + public boolean isEmpty() { + return isEmpty(Long.MAX_VALUE); + } + + public boolean isEmpty(long epoch) { + return snapshottableSize(epoch) == 0; + } + + @Override + public boolean containsKey(Object key) { + return containsKey(key, Long.MAX_VALUE); + } + + public boolean containsKey(Object key, long epoch) { + return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) != null; + } + + @Override + public boolean containsValue(Object value) { + Iterator<Entry<K, V>> iter = entrySet().iterator(); + while (iter.hasNext()) { + Entry<K, V> e = iter.next(); + if (value.equals(e.getValue())) { + return true; + } + } + return false; + } + + @Override + public V get(Object key) { + return get(key, Long.MAX_VALUE); + } + + public V get(Object key, long epoch) { + Entry<K, V> entry = + snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch); + if (entry == null) { + return null; + } + return entry.getValue(); + } + + @Override + public V put(K key, V value) { + Objects.requireNonNull(key); + Objects.requireNonNull(value); + TimelineHashMapEntry<K, V> entry = new TimelineHashMapEntry<>(key, value); + TimelineHashMapEntry<K, V> prev = snapshottableAddOrReplace(entry); + if (prev == null) { + return null; + } + return prev.getValue(); + } + + @Override + @SuppressWarnings("unchecked") + public V remove(Object key) { + return (V) snapshottableRemove(new TimelineHashMapEntry<>(key, null)); + } + + @Override + public void putAll(Map<? extends K, ? extends V> map) { + for (Map.Entry<? extends K, ? extends V> e : map.entrySet()) { + put(e.getKey(), e.getValue()); + } + } + + @Override + public void clear() { + Iterator<TimelineHashMapEntry<K, V>> iter = snapshottableIterator(Long.MAX_VALUE); + while (iter.hasNext()) { + iter.next(); + iter.remove(); + } + } + + final class KeySet extends AbstractSet<K> { + private final long epoch; + + KeySet(long epoch) { + this.epoch = epoch; + } + + public final int size() { + return TimelineHashMap.this.size(epoch); + } + + public final void clear() { + if (epoch != Long.MAX_VALUE) { + throw new RuntimeException("can't modify snapshot"); + } + TimelineHashMap.this.clear(); + } + + public final Iterator<K> iterator() { + return new KeyIterator(epoch); + } + + public final boolean contains(Object o) { + return TimelineHashMap.this.containsKey(o, epoch); + } + + public final boolean remove(Object o) { + if (epoch != Long.MAX_VALUE) { + throw new RuntimeException("can't modify snapshot"); + } + return TimelineHashMap.this.remove(o) != null; + } + } + + final class KeyIterator implements Iterator<K> { + private final Iterator<TimelineHashMapEntry<K, V>> iter; + + KeyIterator(long epoch) { + this.iter = snapshottableIterator(epoch); + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public K next() { + TimelineHashMapEntry<K, V> next = iter.next(); + return next.getKey(); + } + + @Override + public void remove() { + iter.remove(); + } + } + + @Override + public Set<K> keySet() { + return keySet(Long.MAX_VALUE); + } + + public Set<K> keySet(long epoch) { + return new KeySet(epoch); + } + + final class Values extends AbstractCollection<V> { + private final long epoch; + + Values(long epoch) { + this.epoch = epoch; + } + + public final int size() { + return TimelineHashMap.this.size(epoch); + } + + public final void clear() { + if (epoch != Long.MAX_VALUE) { + throw new RuntimeException("can't modify snapshot"); + } + TimelineHashMap.this.clear(); + } + + public final Iterator<V> iterator() { + return new ValueIterator(epoch); + } + + public final boolean contains(Object o) { + return TimelineHashMap.this.containsKey(o, epoch); + } + } + + final class ValueIterator implements Iterator<V> { + private final Iterator<TimelineHashMapEntry<K, V>> iter; + + ValueIterator(long epoch) { + this.iter = snapshottableIterator(epoch); + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public V next() { + TimelineHashMapEntry<K, V> next = iter.next(); + return next.getValue(); + } + + @Override + public void remove() { + iter.remove(); + } + } + + @Override + public Collection<V> values() { + return values(Long.MAX_VALUE); + } + + public Collection<V> values(long epoch) { + return new Values(epoch); + } + + final class EntrySet extends AbstractSet<Map.Entry<K, V>> { + private final long epoch; + + EntrySet(long epoch) { + this.epoch = epoch; + } + + public final int size() { + return TimelineHashMap.this.size(epoch); + } + + public final void clear() { + if (epoch != Long.MAX_VALUE) { + throw new RuntimeException("can't modify snapshot"); + } + TimelineHashMap.this.clear(); + } + + public final Iterator<Map.Entry<K, V>> iterator() { + return new EntryIterator(epoch); + } + + public final boolean contains(Object o) { + return snapshottableGet(o, epoch) != null; + } + + public final boolean remove(Object o) { + if (epoch != Long.MAX_VALUE) { + throw new RuntimeException("can't modify snapshot"); + } + return snapshottableRemove(o) != null; + } + } + + final class EntryIterator implements Iterator<Map.Entry<K, V>> { + private final Iterator<TimelineHashMapEntry<K, V>> iter; + + EntryIterator(long epoch) { + this.iter = snapshottableIterator(epoch); + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public Map.Entry<K, V> next() { + return iter.next(); + } + + @Override + public void remove() { + iter.remove(); + } + } + + @Override + public Set<Entry<K, V>> entrySet() { + return entrySet(Long.MAX_VALUE); + } + + public Set<Entry<K, V>> entrySet(long epoch) { + return new EntrySet(epoch); + } + + @Override + public int hashCode() { + int hash = 0; + Iterator<Entry<K, V>> iter = entrySet().iterator(); + while (iter.hasNext()) { + hash += iter.next().hashCode(); + } + return hash; + } + + @Override + public boolean equals(Object o) { + if (o == this) + return true; + if (!(o instanceof Map)) + return false; + Map<?, ?> m = (Map<?, ?>) o; + if (m.size() != size()) + return false; + try { + Iterator<Entry<K, V>> iter = entrySet().iterator(); + while (iter.hasNext()) { + Entry<K, V> entry = iter.next(); + if (!get(entry.getKey()).equals(entry.getValue())) { Review comment: Should we use `m.get` here ? ########## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ########## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + * Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte + * current state. + * + * One very important feature of SnapshottableHashTable is that we support iterating + * over a snapshot even while changes are being made to the current state. See the + * Javadoc for the iterator for more information about how this is accomplished. + * + * All of these classes require external synchronization, and don't support null keys or + * values. + */ +class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEpoch> + extends BaseHashTable<T> implements Revertable { + interface ElementWithStartEpoch { + void setStartEpoch(long startEpoch); + long startEpoch(); + } + + static class HashTier<T> { + private final int size; + private BaseHashTable<T> deltaTable; + + HashTier(int size) { + this.size = size; + } + } + + /** + * Iterate over the values that currently exist in the hash table. + * + * You can use this iterator even if you are making changes to the map. + * The changes may or may not be visible while you are iterating. + */ + class CurrentIterator implements Iterator<T> { + private final Object[] topTier; + private final List<T> ready; + private int slot; + private T lastReturned; + + CurrentIterator(Object[] topTier) { + this.topTier = topTier; + this.ready = new ArrayList<>(); + this.slot = 0; + this.lastReturned = null; + } + + @Override + public boolean hasNext() { + while (ready.isEmpty()) { + if (slot == topTier.length) { + return false; + } + BaseHashTable.unpackSlot(ready, topTier, slot); + slot++; + } + return true; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + lastReturned = ready.remove(ready.size() - 1); + return lastReturned; + } + + @Override + public void remove() { + if (lastReturned == null) { + throw new UnsupportedOperationException("remove"); + } + snapshottableRemove(lastReturned); + lastReturned = null; + } + } + + /** + * Iterate over the values that existed in the hash table during a specific snapshot. + * + * You can use this iterator even if you are making changes to the map. + * The snapshot is immutable and will always show up the same. + */ + class HistoricalIterator implements Iterator<T> { + private final Object[] topTier; + private final Snapshot snapshot; + private final List<T> temp; + private final List<T> ready; + private int slot; + + HistoricalIterator(Object[] topTier, Snapshot snapshot) { + this.topTier = topTier; + this.snapshot = snapshot; + this.temp = new ArrayList<>(); + this.ready = new ArrayList<>(); + this.slot = 0; + } + + @Override + public boolean hasNext() { + while (ready.isEmpty()) { + if (slot == topTier.length) { + return false; + } + BaseHashTable.unpackSlot(temp, topTier, slot); + for (T object : temp) { + if (object.startEpoch() <= snapshot.epoch()) { + ready.add(object); + } + } + temp.clear(); + + /* + * As we iterate over the SnapshottableHashTable, elements may move from + * the top tier into the snapshot tiers. This would happen if something + * were deleted in the top tier, for example, but still retained in the + * snapshot. + * + * We don't want to return any elements twice, though. Therefore, we + * iterate over the top tier and the snapshot tier at the + * same time. The key to understanding how this works is realizing that + * both hash tables use the same hash function, but simply choose a + * different number of significant bits based on their size. + * So if the top tier has size 4 and the snapshot tier has size 2, we have + * the following mapping: + * + * Elements that would be in slot 0 or 1 in the top tier can only be in + * slot 0 in the snapshot tier. + * Elements that would be in slot 2 or 3 in the top tier can only be in + * slot 1 in the snapshot tier. + * + * Therefore, we can do something like this: + * 1. check slot 0 in the top tier and slot 0 in the snapshot tier. + * 2. check slot 1 in the top tier and slot 0 in the snapshot tier. + * 3. check slot 2 in the top tier and slot 1 in the snapshot tier. + * 4. check slot 3 in the top tier and slot 1 in the snapshot tier. + * + * If elements move from the top tier to the snapshot tier, then + * we'll still find them and report them exactly once. + * + * Note that while I used 4 and 2 as example sizes here, the same pattern + * holds for different powers of two. The "snapshot slot" of an element + * will be the top few bits of the top tier slot of that element. + */ + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier != null && tier.deltaTable != null) { + BaseHashTable<T> deltaTable = tier.deltaTable; + int shift = Integer.numberOfLeadingZeros(deltaTable.baseElements().length) - + Integer.numberOfLeadingZeros(topTier.length); Review comment: Hmm, do we require the elements length in the top tier to be always >= that of the snapshot? If so, how do we enforce that? ########## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshotRegistry.java ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.stream.Collectors; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +/** + * A registry containing snapshots of timeline data structures. + * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time. + * Therefore, we use ArrayLists here rather than a data structure with higher overhead. + */ +public class SnapshotRegistry { + private final Logger log; + + /** + * The current epoch. All snapshot epochs are lower than this number. + */ + private long curEpoch; + + /** + * An ArrayList of snapshots, kept in sorted order. + */ + private final ArrayList<Snapshot> snapshots; + + public SnapshotRegistry(long startEpoch) { + this(new LogContext(), startEpoch); + } + + public SnapshotRegistry(LogContext logContext, long startEpoch) { + this.log = logContext.logger(SnapshotRegistry.class); + this.curEpoch = startEpoch; + this.snapshots = new ArrayList<>(5); + } + + /** + * Returns an iterator that moves through snapshots from the lowest to the highest epoch. + */ + public Iterator<Snapshot> snapshots() { + return snapshots.iterator(); + } + + /** + * Gets the snapshot for a specific epoch. + */ + public Snapshot get(long epoch) { + for (Snapshot snapshot : snapshots) { + if (snapshot.epoch() == epoch) { + return snapshot; + } + } + throw new RuntimeException("No snapshot for epoch " + epoch); + } + + /** + * Creates a new snapshot at the given epoch. + * + * @param epoch The epoch to create the snapshot at. The current epoch + * will be advanced to one past this epoch. + */ + public Snapshot createSnapshot(long epoch) { + if (epoch < curEpoch) { + throw new RuntimeException("Can't create a new snapshot at epoch " + epoch + + " because the current epoch is " + curEpoch); + } + Snapshot snapshot = new Snapshot(epoch); + snapshots.add(snapshot); + curEpoch = epoch + 1; + log.debug("Creating snapshot {}", epoch); + return snapshot; + } + + /** + * Deletes the snapshot with the given epoch. + * + * @param epoch The epoch of the snapshot to delete. + */ + public void deleteSnapshot(long epoch) { + Iterator<Snapshot> iter = snapshots.iterator(); + while (iter.hasNext()) { + Snapshot snapshot = iter.next(); + if (snapshot.epoch() == epoch) { + log.debug("Deleting snapshot {}", epoch); + iter.remove(); + return; + } + } + throw new RuntimeException(String.format( + "No snapshot at epoch %d found. Snapshot epochs are %s.", epoch, + snapshots.stream().map(snapshot -> String.valueOf(snapshot.epoch())). + collect(Collectors.joining(", ")))); + } + + /** + * Reverts the state of all data structures to the state at the given epoch. + * + * @param epoch The epoch of the snapshot to revert to. + */ + public void revertToSnapshot(long epoch) { + Snapshot target = null; + for (Iterator<Snapshot> iter = snapshots.iterator(); iter.hasNext(); ) { + Snapshot snapshot = iter.next(); + if (target == null) { + if (snapshot.epoch() == epoch) { + target = snapshot; + } + } else { + iter.remove(); + } + } + log.info("Reverting to snapshot {}", epoch); + target.handleRevert(); + curEpoch = epoch; + } + + /** + * Returns the current epoch. + */ + public long curEpoch() { + return curEpoch; + } + + public void deleteSnapshotsUpTo(long offset) { Review comment: Should offset be named sth like targetEpoch? ########## File path: metadata/src/test/java/org/apache/kafka/timeline/timeline/SnapshottableHashTableTest.java ########## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; + +@Timeout(value = 40) +public class SnapshottableHashTableTest { + + /** + * The class of test elements. + * + * This class is intended to help test how the table handles distinct objects which + * are equal to each other. Therefore, for the purpose of hashing and equality, we + * only check i here, and ignore j. + */ + static class TestElement implements SnapshottableHashTable.ElementWithStartEpoch { + private final int i; + private final char j; + private long startEpoch = Long.MAX_VALUE; + + TestElement(int i, char j) { + this.i = i; + this.j = j; + } + + @Override + public void setStartEpoch(long startEpoch) { + this.startEpoch = startEpoch; + } + + @Override + public long startEpoch() { + return startEpoch; + } + + @Override + public int hashCode() { + return i; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof TestElement)) { + return false; + } + TestElement other = (TestElement) o; + return other.i == i; + } + + @Override + public String toString() { + return String.format("E_%d%c(%s)", i, j, System.identityHashCode(this)); + } + } + + private static final TestElement E_1A = new TestElement(1, 'A'); + private static final TestElement E_1B = new TestElement(1, 'B'); + private static final TestElement E_2A = new TestElement(2, 'A'); + private static final TestElement E_3A = new TestElement(3, 'A'); + private static final TestElement E_3B = new TestElement(3, 'B'); + + @Test + public void testEmptyTable() { + SnapshotRegistry registry = new SnapshotRegistry(0); + SnapshottableHashTable<TestElement> table = + new SnapshottableHashTable<>(registry, 1); + assertEquals(0, table.snapshottableSize(Long.MAX_VALUE)); + } + + @Test + public void testAddAndRemove() { + SnapshotRegistry registry = new SnapshotRegistry(0); + SnapshottableHashTable<TestElement> table = + new SnapshottableHashTable<>(registry, 1); + assertTrue(null == table.snapshottableAddOrReplace(E_1B)); + assertEquals(1, table.snapshottableSize(Long.MAX_VALUE)); + registry.createSnapshot(0); + assertTrue(E_1B == table.snapshottableAddOrReplace(E_1A)); + assertTrue(E_1B == table.snapshottableGet(E_1A, 0)); + assertTrue(E_1A == table.snapshottableGet(E_1A, Long.MAX_VALUE)); + assertEquals(null, table.snapshottableAddOrReplace(E_2A)); + assertEquals(null, table.snapshottableAddOrReplace(E_3A)); + assertEquals(3, table.snapshottableSize(Long.MAX_VALUE)); + assertEquals(1, table.snapshottableSize(0)); + registry.createSnapshot(1); + assertEquals(E_1A, table.snapshottableRemove(E_1B)); + assertEquals(E_2A, table.snapshottableRemove(E_2A)); + assertEquals(E_3A, table.snapshottableRemove(E_3A)); + assertEquals(0, table.snapshottableSize(Long.MAX_VALUE)); + assertEquals(1, table.snapshottableSize(0)); + assertEquals(3, table.snapshottableSize(1)); + registry.deleteSnapshot(0); + assertEquals(assertThrows(RuntimeException.class, () -> + table.snapshottableSize(0)).getMessage(), "No snapshot for epoch 0"); + registry.deleteSnapshot(1); + assertEquals(0, table.snapshottableSize(Long.MAX_VALUE)); + } + + @Test + public void testIterateOverSnapshot() { + SnapshotRegistry registry = new SnapshotRegistry(0); + SnapshottableHashTable<TestElement> table = + new SnapshottableHashTable<>(registry, 1); + assertTrue(table.snapshottableAddUnlessPresent(E_1B)); + assertFalse(table.snapshottableAddUnlessPresent(E_1A)); + assertTrue(table.snapshottableAddUnlessPresent(E_2A)); + assertTrue(table.snapshottableAddUnlessPresent(E_3A)); + registry.createSnapshot(0); + assertIteratorContains(table.snapshottableIterator(0), E_1B, E_2A, E_3A); + table.snapshottableRemove(E_1B); + assertIteratorContains(table.snapshottableIterator(0), E_1B, E_2A, E_3A); + table.snapshottableRemove(E_1A); + assertIteratorContains(table.snapshottableIterator(Long.MAX_VALUE), E_2A, E_3A); + table.snapshottableRemove(E_2A); + table.snapshottableRemove(E_3A); + assertIteratorContains(table.snapshottableIterator(0), E_1B, E_2A, E_3A); + } + + @Test + public void testIterateOverSnapshotWhileExpandingTable() { + SnapshotRegistry registry = new SnapshotRegistry(0); + SnapshottableHashTable<TestElement> table = + new SnapshottableHashTable<>(registry, 1); + assertEquals(null, table.snapshottableAddOrReplace(E_1A)); + registry.createSnapshot(0); + Iterator<TestElement> iter = table.snapshottableIterator(0); + assertTrue(table.snapshottableAddUnlessPresent(E_2A)); + assertTrue(table.snapshottableAddUnlessPresent(E_3A)); + assertIteratorContains(iter, E_1A); + } + + @Test + public void testIterateOverSnapshotWhileDeletingAndReplacing() { + SnapshotRegistry registry = new SnapshotRegistry(0); + SnapshottableHashTable<TestElement> table = + new SnapshottableHashTable<>(registry, 1); + assertEquals(null, table.snapshottableAddOrReplace(E_1A)); + assertEquals(null, table.snapshottableAddOrReplace(E_2A)); + assertEquals(null, table.snapshottableAddOrReplace(E_3A)); + assertEquals(E_1A, table.snapshottableRemove(E_1A)); + assertEquals(null, table.snapshottableAddOrReplace(E_1B)); + registry.createSnapshot(0); + Iterator<TestElement> iter = table.snapshottableIterator(0); + List<TestElement> iterElements = new ArrayList<>(); + iterElements.add(iter.next()); + assertEquals(E_2A, table.snapshottableRemove(E_2A)); + assertEquals(E_3A, table.snapshottableAddOrReplace(E_3B)); + iterElements.add(iter.next()); + assertEquals(E_1B, table.snapshottableRemove(E_1B)); + iterElements.add(iter.next()); + assertFalse(iter.hasNext()); + assertIteratorContains(iterElements.iterator(), E_1B, E_2A, E_3A); + } + + @Test + public void testRevert() { + SnapshotRegistry registry = new SnapshotRegistry(0); + SnapshottableHashTable<TestElement> table = + new SnapshottableHashTable<>(registry, 1); + assertEquals(null, table.snapshottableAddOrReplace(E_1A)); + assertEquals(null, table.snapshottableAddOrReplace(E_2A)); + assertEquals(null, table.snapshottableAddOrReplace(E_3A)); + registry.createSnapshot(0); + assertEquals(E_1A, table.snapshottableAddOrReplace(E_1B)); + assertEquals(E_3A, table.snapshottableAddOrReplace(E_3B)); + registry.createSnapshot(1); + assertEquals(3, table.snapshottableSize(Long.MAX_VALUE)); + assertIteratorContains(table.snapshottableIterator(Long.MAX_VALUE), E_1B, E_2A, E_3B); + table.snapshottableRemove(E_1B); + table.snapshottableRemove(E_2A); + table.snapshottableRemove(E_3B); + assertEquals(0, table.snapshottableSize(Long.MAX_VALUE)); + assertEquals(3, table.snapshottableSize(0)); + assertEquals(3, table.snapshottableSize(1)); + registry.revertToSnapshot(0); + assertIteratorContains(table.snapshottableIterator(Long.MAX_VALUE), E_1A, E_2A, E_3A); + } + + /** + * Assert that the given iterator contains the given elements, in any order. + * We compare using reference equality here, rather than object equality. + */ + private static void assertIteratorContains(Iterator<? extends Object> iter, Review comment: This method tests exact set equal, not just containment. So, perhaps the method can be named more accurately. ########## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ########## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + * Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte + * current state. + * + * One very important feature of SnapshottableHashTable is that we support iterating + * over a snapshot even while changes are being made to the current state. See the + * Javadoc for the iterator for more information about how this is accomplished. + * + * All of these classes require external synchronization, and don't support null keys or + * values. + */ +class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEpoch> + extends BaseHashTable<T> implements Revertable { + interface ElementWithStartEpoch { + void setStartEpoch(long startEpoch); + long startEpoch(); + } + + static class HashTier<T> { + private final int size; + private BaseHashTable<T> deltaTable; + + HashTier(int size) { + this.size = size; + } + } + + /** + * Iterate over the values that currently exist in the hash table. + * + * You can use this iterator even if you are making changes to the map. + * The changes may or may not be visible while you are iterating. + */ + class CurrentIterator implements Iterator<T> { + private final Object[] topTier; + private final List<T> ready; + private int slot; + private T lastReturned; + + CurrentIterator(Object[] topTier) { + this.topTier = topTier; + this.ready = new ArrayList<>(); + this.slot = 0; + this.lastReturned = null; + } + + @Override + public boolean hasNext() { + while (ready.isEmpty()) { + if (slot == topTier.length) { + return false; + } + BaseHashTable.unpackSlot(ready, topTier, slot); + slot++; + } + return true; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + lastReturned = ready.remove(ready.size() - 1); + return lastReturned; + } + + @Override + public void remove() { + if (lastReturned == null) { + throw new UnsupportedOperationException("remove"); + } + snapshottableRemove(lastReturned); + lastReturned = null; + } + } + + /** + * Iterate over the values that existed in the hash table during a specific snapshot. + * + * You can use this iterator even if you are making changes to the map. + * The snapshot is immutable and will always show up the same. + */ + class HistoricalIterator implements Iterator<T> { + private final Object[] topTier; + private final Snapshot snapshot; + private final List<T> temp; + private final List<T> ready; + private int slot; + + HistoricalIterator(Object[] topTier, Snapshot snapshot) { + this.topTier = topTier; + this.snapshot = snapshot; + this.temp = new ArrayList<>(); + this.ready = new ArrayList<>(); + this.slot = 0; + } + + @Override + public boolean hasNext() { + while (ready.isEmpty()) { + if (slot == topTier.length) { + return false; + } + BaseHashTable.unpackSlot(temp, topTier, slot); + for (T object : temp) { + if (object.startEpoch() <= snapshot.epoch()) { + ready.add(object); + } + } + temp.clear(); + + /* + * As we iterate over the SnapshottableHashTable, elements may move from + * the top tier into the snapshot tiers. This would happen if something + * were deleted in the top tier, for example, but still retained in the + * snapshot. + * + * We don't want to return any elements twice, though. Therefore, we + * iterate over the top tier and the snapshot tier at the + * same time. The key to understanding how this works is realizing that + * both hash tables use the same hash function, but simply choose a + * different number of significant bits based on their size. + * So if the top tier has size 4 and the snapshot tier has size 2, we have + * the following mapping: + * + * Elements that would be in slot 0 or 1 in the top tier can only be in + * slot 0 in the snapshot tier. + * Elements that would be in slot 2 or 3 in the top tier can only be in + * slot 1 in the snapshot tier. + * + * Therefore, we can do something like this: + * 1. check slot 0 in the top tier and slot 0 in the snapshot tier. + * 2. check slot 1 in the top tier and slot 0 in the snapshot tier. + * 3. check slot 2 in the top tier and slot 1 in the snapshot tier. + * 4. check slot 3 in the top tier and slot 1 in the snapshot tier. + * + * If elements move from the top tier to the snapshot tier, then + * we'll still find them and report them exactly once. + * + * Note that while I used 4 and 2 as example sizes here, the same pattern + * holds for different powers of two. The "snapshot slot" of an element + * will be the top few bits of the top tier slot of that element. + */ + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier != null && tier.deltaTable != null) { + BaseHashTable<T> deltaTable = tier.deltaTable; + int shift = Integer.numberOfLeadingZeros(deltaTable.baseElements().length) - + Integer.numberOfLeadingZeros(topTier.length); + int tierSlot = slot >>> shift; + BaseHashTable.unpackSlot(temp, deltaTable.baseElements(), tierSlot); + for (T object : temp) { + if (BaseHashTable.findSlot(object, topTier.length) == slot) { + ready.add(object); + } else { + } + } + temp.clear(); + } + slot++; + } + return true; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return ready.remove(ready.size() - 1); + } + } + + private final SnapshotRegistry snapshotRegistry; + + SnapshottableHashTable(SnapshotRegistry snapshotRegistry, int expectedSize) { + super(expectedSize); + this.snapshotRegistry = snapshotRegistry; + } + + int snapshottableSize(long epoch) { + if (epoch == Long.MAX_VALUE) { + return baseSize(); + } else { + Snapshot snapshot = snapshotRegistry.get(epoch); + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier == null) { + return baseSize(); + } else { + return tier.size; + } + } + } + + T snapshottableGet(Object key, long epoch) { + T result = baseGet(key); + if (result != null && result.startEpoch() <= epoch) { + return result; + } + if (epoch == Long.MAX_VALUE) { + return null; + } + Snapshot snapshot = snapshotRegistry.get(epoch); + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier == null || tier.deltaTable == null) { + return null; + } + result = tier.deltaTable.baseGet(key); + return result; + } + + boolean snapshottableAddUnlessPresent(T object) { + T prev = baseGet(object); + if (prev != null) { + return false; + } + object.setStartEpoch(snapshotRegistry.curEpoch()); + int prevSize = baseSize(); + baseAddOrReplace(object); + updateTierData(prevSize); + return true; + } + + T snapshottableAddOrReplace(T object) { + object.setStartEpoch(snapshotRegistry.curEpoch()); + int prevSize = baseSize(); + T prev = baseAddOrReplace(object); + if (prev == null) { + updateTierData(prevSize); + } else { + updateTierData(prev, prevSize); + } + return prev; + } + + Object snapshottableRemove(Object object) { + T prev = baseRemove(object); + if (prev == null) { + return null; + } else { + updateTierData(prev, baseSize() + 1); + return prev; + } + } + + private void updateTierData(int prevSize) { + Iterator<Snapshot> iter = snapshotRegistry.snapshots(); + while (iter.hasNext()) { + Snapshot snapshot = iter.next(); + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier == null) { + tier = new HashTier<>(prevSize); + snapshot.setData(SnapshottableHashTable.this, tier); + } + } + } + + private void updateTierData(T prev, int prevSize) { + Iterator<Snapshot> iter = snapshotRegistry.snapshots(); + while (iter.hasNext()) { + Snapshot snapshot = iter.next(); + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier == null) { + tier = new HashTier<>(prevSize); + snapshot.setData(SnapshottableHashTable.this, tier); + } + if (prev.startEpoch() <= snapshot.epoch()) { + if (tier.deltaTable == null) { + tier.deltaTable = new BaseHashTable<>(1); + } + tier.deltaTable.baseAddOrReplace(prev); + } + } + } + + Iterator<T> snapshottableIterator(long epoch) { + if (epoch == Long.MAX_VALUE) { + return new CurrentIterator(baseElements()); + } else { + return new HistoricalIterator(baseElements(), snapshotRegistry.get(epoch)); + } + } + + String snapshottableToDebugString() { + StringBuilder bld = new StringBuilder(); + bld.append(String.format("SnapshottableHashTable{%n")); + bld.append("top tier: "); + bld.append(baseToDebugString()); + bld.append(String.format(",%nsnapshot tiers: [%n")); + String prefix = ""; + for (Iterator<Snapshot> iter = snapshotRegistry.snapshots(); iter.hasNext(); ) { + Snapshot snapshot = iter.next(); + bld.append(prefix); + bld.append("epoch ").append(snapshot.epoch()).append(": "); + HashTier<T> tier = snapshot.data(this); + if (tier == null) { + bld.append("null"); + } else { + bld.append("HashTier{"); + bld.append("size=").append(tier.size); + bld.append(", deltaTable="); + if (tier.deltaTable == null) { + bld.append("null"); + } else { + bld.append(tier.deltaTable.baseToDebugString()); + } + bld.append("}"); + } + bld.append(String.format("%n")); + } + bld.append(String.format("]}%n")); + return bld.toString(); + } + + @SuppressWarnings("unchecked") + @Override + public void executeRevert(long targetEpoch, Object data) { + HashTier<T> tier = (HashTier<T>) data; + Iterator<T> iter = snapshottableIterator(Long.MAX_VALUE); + while (iter.hasNext()) { + T element = iter.next(); + if (element.startEpoch() > targetEpoch) { + iter.remove(); + } + } + BaseHashTable<T> deltaTable = tier.deltaTable; + if (deltaTable != null) { + List<T> out = new ArrayList<>(); + for (int i = 0; i < deltaTable.baseElements().length; i++) { + BaseHashTable.unpackSlot(out, deltaTable.baseElements(), i); + for (T value : out) { + baseAddOrReplace(value); Review comment: Hmm, why do we need to add the snapshot data to the top tier? It seems that it's ok to just leave it in the snapshot. ########## File path: metadata/src/test/java/org/apache/kafka/timeline/timeline/SnapshottableHashTableTest.java ########## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; + +@Timeout(value = 40) +public class SnapshottableHashTableTest { + + /** + * The class of test elements. + * + * This class is intended to help test how the table handles distinct objects which + * are equal to each other. Therefore, for the purpose of hashing and equality, we + * only check i here, and ignore j. + */ + static class TestElement implements SnapshottableHashTable.ElementWithStartEpoch { + private final int i; + private final char j; + private long startEpoch = Long.MAX_VALUE; + + TestElement(int i, char j) { + this.i = i; + this.j = j; + } + + @Override + public void setStartEpoch(long startEpoch) { + this.startEpoch = startEpoch; + } + + @Override + public long startEpoch() { + return startEpoch; + } + + @Override + public int hashCode() { + return i; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof TestElement)) { + return false; + } + TestElement other = (TestElement) o; + return other.i == i; + } + + @Override + public String toString() { + return String.format("E_%d%c(%s)", i, j, System.identityHashCode(this)); + } + } + + private static final TestElement E_1A = new TestElement(1, 'A'); + private static final TestElement E_1B = new TestElement(1, 'B'); + private static final TestElement E_2A = new TestElement(2, 'A'); + private static final TestElement E_3A = new TestElement(3, 'A'); + private static final TestElement E_3B = new TestElement(3, 'B'); + + @Test + public void testEmptyTable() { + SnapshotRegistry registry = new SnapshotRegistry(0); + SnapshottableHashTable<TestElement> table = + new SnapshottableHashTable<>(registry, 1); + assertEquals(0, table.snapshottableSize(Long.MAX_VALUE)); + } + + @Test + public void testAddAndRemove() { + SnapshotRegistry registry = new SnapshotRegistry(0); + SnapshottableHashTable<TestElement> table = + new SnapshottableHashTable<>(registry, 1); + assertTrue(null == table.snapshottableAddOrReplace(E_1B)); + assertEquals(1, table.snapshottableSize(Long.MAX_VALUE)); + registry.createSnapshot(0); + assertTrue(E_1B == table.snapshottableAddOrReplace(E_1A)); + assertTrue(E_1B == table.snapshottableGet(E_1A, 0)); + assertTrue(E_1A == table.snapshottableGet(E_1A, Long.MAX_VALUE)); + assertEquals(null, table.snapshottableAddOrReplace(E_2A)); + assertEquals(null, table.snapshottableAddOrReplace(E_3A)); + assertEquals(3, table.snapshottableSize(Long.MAX_VALUE)); + assertEquals(1, table.snapshottableSize(0)); + registry.createSnapshot(1); + assertEquals(E_1A, table.snapshottableRemove(E_1B)); + assertEquals(E_2A, table.snapshottableRemove(E_2A)); + assertEquals(E_3A, table.snapshottableRemove(E_3A)); + assertEquals(0, table.snapshottableSize(Long.MAX_VALUE)); + assertEquals(1, table.snapshottableSize(0)); + assertEquals(3, table.snapshottableSize(1)); + registry.deleteSnapshot(0); + assertEquals(assertThrows(RuntimeException.class, () -> + table.snapshottableSize(0)).getMessage(), "No snapshot for epoch 0"); + registry.deleteSnapshot(1); + assertEquals(0, table.snapshottableSize(Long.MAX_VALUE)); + } + + @Test + public void testIterateOverSnapshot() { + SnapshotRegistry registry = new SnapshotRegistry(0); + SnapshottableHashTable<TestElement> table = + new SnapshottableHashTable<>(registry, 1); + assertTrue(table.snapshottableAddUnlessPresent(E_1B)); + assertFalse(table.snapshottableAddUnlessPresent(E_1A)); + assertTrue(table.snapshottableAddUnlessPresent(E_2A)); + assertTrue(table.snapshottableAddUnlessPresent(E_3A)); + registry.createSnapshot(0); + assertIteratorContains(table.snapshottableIterator(0), E_1B, E_2A, E_3A); + table.snapshottableRemove(E_1B); Review comment: Should we assert the return value is not null? ########## File path: metadata/src/test/java/org/apache/kafka/timeline/timeline/SnapshottableHashTableTest.java ########## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; + +@Timeout(value = 40) +public class SnapshottableHashTableTest { + + /** + * The class of test elements. + * + * This class is intended to help test how the table handles distinct objects which + * are equal to each other. Therefore, for the purpose of hashing and equality, we + * only check i here, and ignore j. + */ + static class TestElement implements SnapshottableHashTable.ElementWithStartEpoch { + private final int i; + private final char j; + private long startEpoch = Long.MAX_VALUE; + + TestElement(int i, char j) { + this.i = i; + this.j = j; + } + + @Override + public void setStartEpoch(long startEpoch) { + this.startEpoch = startEpoch; + } + + @Override + public long startEpoch() { + return startEpoch; + } + + @Override + public int hashCode() { + return i; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof TestElement)) { + return false; + } + TestElement other = (TestElement) o; + return other.i == i; + } + + @Override + public String toString() { + return String.format("E_%d%c(%s)", i, j, System.identityHashCode(this)); + } + } + + private static final TestElement E_1A = new TestElement(1, 'A'); + private static final TestElement E_1B = new TestElement(1, 'B'); + private static final TestElement E_2A = new TestElement(2, 'A'); + private static final TestElement E_3A = new TestElement(3, 'A'); + private static final TestElement E_3B = new TestElement(3, 'B'); + + @Test + public void testEmptyTable() { + SnapshotRegistry registry = new SnapshotRegistry(0); + SnapshottableHashTable<TestElement> table = + new SnapshottableHashTable<>(registry, 1); + assertEquals(0, table.snapshottableSize(Long.MAX_VALUE)); + } + + @Test + public void testAddAndRemove() { + SnapshotRegistry registry = new SnapshotRegistry(0); + SnapshottableHashTable<TestElement> table = + new SnapshottableHashTable<>(registry, 1); + assertTrue(null == table.snapshottableAddOrReplace(E_1B)); + assertEquals(1, table.snapshottableSize(Long.MAX_VALUE)); + registry.createSnapshot(0); + assertTrue(E_1B == table.snapshottableAddOrReplace(E_1A)); + assertTrue(E_1B == table.snapshottableGet(E_1A, 0)); + assertTrue(E_1A == table.snapshottableGet(E_1A, Long.MAX_VALUE)); + assertEquals(null, table.snapshottableAddOrReplace(E_2A)); + assertEquals(null, table.snapshottableAddOrReplace(E_3A)); + assertEquals(3, table.snapshottableSize(Long.MAX_VALUE)); + assertEquals(1, table.snapshottableSize(0)); + registry.createSnapshot(1); + assertEquals(E_1A, table.snapshottableRemove(E_1B)); + assertEquals(E_2A, table.snapshottableRemove(E_2A)); + assertEquals(E_3A, table.snapshottableRemove(E_3A)); + assertEquals(0, table.snapshottableSize(Long.MAX_VALUE)); + assertEquals(1, table.snapshottableSize(0)); + assertEquals(3, table.snapshottableSize(1)); + registry.deleteSnapshot(0); + assertEquals(assertThrows(RuntimeException.class, () -> + table.snapshottableSize(0)).getMessage(), "No snapshot for epoch 0"); + registry.deleteSnapshot(1); + assertEquals(0, table.snapshottableSize(Long.MAX_VALUE)); + } + + @Test + public void testIterateOverSnapshot() { + SnapshotRegistry registry = new SnapshotRegistry(0); + SnapshottableHashTable<TestElement> table = + new SnapshottableHashTable<>(registry, 1); + assertTrue(table.snapshottableAddUnlessPresent(E_1B)); + assertFalse(table.snapshottableAddUnlessPresent(E_1A)); + assertTrue(table.snapshottableAddUnlessPresent(E_2A)); + assertTrue(table.snapshottableAddUnlessPresent(E_3A)); + registry.createSnapshot(0); + assertIteratorContains(table.snapshottableIterator(0), E_1B, E_2A, E_3A); + table.snapshottableRemove(E_1B); + assertIteratorContains(table.snapshottableIterator(0), E_1B, E_2A, E_3A); + table.snapshottableRemove(E_1A); Review comment: Should we assert the return value is null? ########## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ########## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + * Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte + * current state. + * + * One very important feature of SnapshottableHashTable is that we support iterating + * over a snapshot even while changes are being made to the current state. See the + * Javadoc for the iterator for more information about how this is accomplished. + * + * All of these classes require external synchronization, and don't support null keys or + * values. + */ +class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEpoch> + extends BaseHashTable<T> implements Revertable { + interface ElementWithStartEpoch { + void setStartEpoch(long startEpoch); + long startEpoch(); + } + + static class HashTier<T> { + private final int size; + private BaseHashTable<T> deltaTable; + + HashTier(int size) { + this.size = size; + } + } + + /** + * Iterate over the values that currently exist in the hash table. + * + * You can use this iterator even if you are making changes to the map. + * The changes may or may not be visible while you are iterating. + */ + class CurrentIterator implements Iterator<T> { + private final Object[] topTier; + private final List<T> ready; + private int slot; + private T lastReturned; + + CurrentIterator(Object[] topTier) { + this.topTier = topTier; + this.ready = new ArrayList<>(); + this.slot = 0; + this.lastReturned = null; + } + + @Override + public boolean hasNext() { + while (ready.isEmpty()) { + if (slot == topTier.length) { + return false; + } + BaseHashTable.unpackSlot(ready, topTier, slot); + slot++; + } + return true; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + lastReturned = ready.remove(ready.size() - 1); + return lastReturned; + } + + @Override + public void remove() { + if (lastReturned == null) { + throw new UnsupportedOperationException("remove"); + } + snapshottableRemove(lastReturned); + lastReturned = null; + } + } + + /** + * Iterate over the values that existed in the hash table during a specific snapshot. + * + * You can use this iterator even if you are making changes to the map. + * The snapshot is immutable and will always show up the same. + */ + class HistoricalIterator implements Iterator<T> { + private final Object[] topTier; + private final Snapshot snapshot; + private final List<T> temp; + private final List<T> ready; + private int slot; + + HistoricalIterator(Object[] topTier, Snapshot snapshot) { + this.topTier = topTier; + this.snapshot = snapshot; + this.temp = new ArrayList<>(); + this.ready = new ArrayList<>(); + this.slot = 0; + } + + @Override + public boolean hasNext() { + while (ready.isEmpty()) { + if (slot == topTier.length) { + return false; + } + BaseHashTable.unpackSlot(temp, topTier, slot); + for (T object : temp) { + if (object.startEpoch() <= snapshot.epoch()) { + ready.add(object); + } + } + temp.clear(); + + /* + * As we iterate over the SnapshottableHashTable, elements may move from + * the top tier into the snapshot tiers. This would happen if something + * were deleted in the top tier, for example, but still retained in the + * snapshot. + * + * We don't want to return any elements twice, though. Therefore, we + * iterate over the top tier and the snapshot tier at the + * same time. The key to understanding how this works is realizing that + * both hash tables use the same hash function, but simply choose a + * different number of significant bits based on their size. + * So if the top tier has size 4 and the snapshot tier has size 2, we have + * the following mapping: + * + * Elements that would be in slot 0 or 1 in the top tier can only be in + * slot 0 in the snapshot tier. + * Elements that would be in slot 2 or 3 in the top tier can only be in + * slot 1 in the snapshot tier. + * + * Therefore, we can do something like this: + * 1. check slot 0 in the top tier and slot 0 in the snapshot tier. + * 2. check slot 1 in the top tier and slot 0 in the snapshot tier. + * 3. check slot 2 in the top tier and slot 1 in the snapshot tier. + * 4. check slot 3 in the top tier and slot 1 in the snapshot tier. + * + * If elements move from the top tier to the snapshot tier, then + * we'll still find them and report them exactly once. + * + * Note that while I used 4 and 2 as example sizes here, the same pattern + * holds for different powers of two. The "snapshot slot" of an element + * will be the top few bits of the top tier slot of that element. + */ + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier != null && tier.deltaTable != null) { + BaseHashTable<T> deltaTable = tier.deltaTable; + int shift = Integer.numberOfLeadingZeros(deltaTable.baseElements().length) - + Integer.numberOfLeadingZeros(topTier.length); + int tierSlot = slot >>> shift; + BaseHashTable.unpackSlot(temp, deltaTable.baseElements(), tierSlot); + for (T object : temp) { + if (BaseHashTable.findSlot(object, topTier.length) == slot) { + ready.add(object); + } else { + } + } + temp.clear(); + } + slot++; + } + return true; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return ready.remove(ready.size() - 1); + } + } + + private final SnapshotRegistry snapshotRegistry; + + SnapshottableHashTable(SnapshotRegistry snapshotRegistry, int expectedSize) { + super(expectedSize); + this.snapshotRegistry = snapshotRegistry; + } + + int snapshottableSize(long epoch) { + if (epoch == Long.MAX_VALUE) { + return baseSize(); + } else { + Snapshot snapshot = snapshotRegistry.get(epoch); + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier == null) { + return baseSize(); + } else { + return tier.size; + } + } + } + + T snapshottableGet(Object key, long epoch) { + T result = baseGet(key); + if (result != null && result.startEpoch() <= epoch) { + return result; + } + if (epoch == Long.MAX_VALUE) { + return null; + } + Snapshot snapshot = snapshotRegistry.get(epoch); + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier == null || tier.deltaTable == null) { + return null; + } + result = tier.deltaTable.baseGet(key); + return result; + } + + boolean snapshottableAddUnlessPresent(T object) { + T prev = baseGet(object); + if (prev != null) { + return false; + } + object.setStartEpoch(snapshotRegistry.curEpoch()); + int prevSize = baseSize(); + baseAddOrReplace(object); + updateTierData(prevSize); + return true; + } + + T snapshottableAddOrReplace(T object) { + object.setStartEpoch(snapshotRegistry.curEpoch()); + int prevSize = baseSize(); + T prev = baseAddOrReplace(object); + if (prev == null) { + updateTierData(prevSize); + } else { + updateTierData(prev, prevSize); + } + return prev; + } + + Object snapshottableRemove(Object object) { + T prev = baseRemove(object); + if (prev == null) { + return null; + } else { + updateTierData(prev, baseSize() + 1); + return prev; + } + } + + private void updateTierData(int prevSize) { + Iterator<Snapshot> iter = snapshotRegistry.snapshots(); + while (iter.hasNext()) { + Snapshot snapshot = iter.next(); + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier == null) { + tier = new HashTier<>(prevSize); + snapshot.setData(SnapshottableHashTable.this, tier); + } + } + } + + private void updateTierData(T prev, int prevSize) { + Iterator<Snapshot> iter = snapshotRegistry.snapshots(); + while (iter.hasNext()) { + Snapshot snapshot = iter.next(); + HashTier<T> tier = snapshot.data(SnapshottableHashTable.this); + if (tier == null) { + tier = new HashTier<>(prevSize); + snapshot.setData(SnapshottableHashTable.this, tier); + } + if (prev.startEpoch() <= snapshot.epoch()) { + if (tier.deltaTable == null) { + tier.deltaTable = new BaseHashTable<>(1); + } + tier.deltaTable.baseAddOrReplace(prev); Review comment: Exposing deltaTable seems to leak the encapsulation of HashTier a bit. Could we add a method like baseAddOrReplace() in HashTier which internally triggers the allocation of the BaseHashTable? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org