cmccabe commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r568250754
########## File path: metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java ########## @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the most recent + * snapshot tier. + * + * Note that there are no keys in SnapshottableHashTable, only values. So it more similar + * to a hash set than a hash map. The subclasses implement full-featured maps and sets + * using this class as a building block. + * + * Each snapshot tier contains a size and a hash table. The size reflects the size at + * the time the snapshot was taken. Note that, as an optimization, snapshot tiers will + * be null if they don't contain anything. So for example, if snapshot 20 of Object O Review comment: 1. The HashTier could be null 2. or the HashTier could be non-null but the deltaTable could be null 3. or the HashTier could be non-null with a non-null delta table You would end up in situation #2 if you deleted something but it was still present in a snapshot. The HashTier would record the old size but not have any delta entries. ########## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.timeline; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 10) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) + +public class TimelineHashMapBenchmark { + private final static int NUM_ENTRIES = 1_000_000; + + @Benchmark + public void testAddEntriesInHashMap() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + HashMap<Integer, String> map = new HashMap<>(NUM_ENTRIES); + for (int i = 0; i < NUM_ENTRIES; i++) { + int key = (int)(0xffffffff & ((i * 2862933555777941757L) + 3037000493L)); + map.put(key, String.valueOf(key)); + } + } + + @Benchmark + public void testAddEntriesInTimelineMap() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + TimelineHashMap<Integer, String> map = + new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES); + for (int i = 0; i < NUM_ENTRIES; i++) { + int key = (int)(0xffffffff & ((i * 2862933555777941757L) + 3037000493L)); + map.put(key, String.valueOf(key)); + } + } + + @Benchmark + public void testAddEntriesWithSnapshots() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + TimelineHashMap<Integer, String> map = + new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES); + long epoch = 0; + int j = 0; + for (int i = 0; i < NUM_ENTRIES; i++) { + int key = (int)(0xffffffff & ((i * 2862933555777941757L) + 3037000493L)); + if (j > 10 && key % 3 == 0) { Review comment: the intention is to have each snapshot have around 10-13 entries on average ########## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.timeline; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 10) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) + +public class TimelineHashMapBenchmark { + private final static int NUM_ENTRIES = 1_000_000; + + @Benchmark + public void testAddEntriesInHashMap() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + HashMap<Integer, String> map = new HashMap<>(NUM_ENTRIES); + for (int i = 0; i < NUM_ENTRIES; i++) { + int key = (int)(0xffffffff & ((i * 2862933555777941757L) + 3037000493L)); Review comment: I liked the idea of having it be deterministic but I guess performance results vary each time anyway... not sure. ########## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.timeline; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 10) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) + +public class TimelineHashMapBenchmark { + private final static int NUM_ENTRIES = 1_000_000; + + @Benchmark + public void testAddEntriesInHashMap() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + HashMap<Integer, String> map = new HashMap<>(NUM_ENTRIES); + for (int i = 0; i < NUM_ENTRIES; i++) { + int key = (int)(0xffffffff & ((i * 2862933555777941757L) + 3037000493L)); + map.put(key, String.valueOf(key)); + } + } + + @Benchmark + public void testAddEntriesInTimelineMap() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + TimelineHashMap<Integer, String> map = + new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES); + for (int i = 0; i < NUM_ENTRIES; i++) { + int key = (int)(0xffffffff & ((i * 2862933555777941757L) + 3037000493L)); + map.put(key, String.valueOf(key)); + } + } + + @Benchmark + public void testAddEntriesWithSnapshots() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + TimelineHashMap<Integer, String> map = + new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES); + long epoch = 0; + int j = 0; + for (int i = 0; i < NUM_ENTRIES; i++) { + int key = (int)(0xffffffff & ((i * 2862933555777941757L) + 3037000493L)); + if (j > 10 && key % 3 == 0) { Review comment: oh, sorry, I put the snapshot creation / deleting in the wrong branch of the "if" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org