[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-02 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568213962



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the most 
recent
+ * snapshot tier.
+ *
+ * Note that there are no keys in SnapshottableHashTable, only values.  So it 
more similar
+ * to a hash set than a hash map.  The subclasses implement full-featured maps 
and sets
+ * using this class as a building block.
+ *
+ * Each snapshot tier contains a size and a hash table.  The size reflects the 
size at
+ * the time the snapshot was taken.  Note that, as an optimization, snapshot 
tiers will
+ * be null if they don't contain anything.  So for example, if snapshot 20 of 
Object O

Review comment:
   To be more precise, it's the snapshot tier's hashtable will be null, 
right?

##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.timeline;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 10)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+
+public class TimelineHashMapBenchmark {
+private final static int NUM_ENTRIES = 1_000_000;
+
+@Benchmark
+public void testAddEntriesInHashMap() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+HashMap map = new HashMap<>(NUM_ENTRIES);
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int)(0x & ((i * 2862933555777941757L) + 
3037000493L));

Review comment:
   Should we use randomly generated keys with a common seed?

##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the 

[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568287239



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.timeline;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 10)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+
+public class TimelineHashMapBenchmark {
+private final static int NUM_ENTRIES = 1_000_000;
+
+@Benchmark
+public void testAddEntriesInHashMap() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+HashMap map = new HashMap<>(NUM_ENTRIES);
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int)(0x & ((i * 2862933555777941757L) + 
3037000493L));
+map.put(key, String.valueOf(key));
+}
+}
+
+@Benchmark
+public void testAddEntriesInTimelineMap() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+TimelineHashMap map =
+new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES);
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int)(0x & ((i * 2862933555777941757L) + 
3037000493L));
+map.put(key, String.valueOf(key));
+}
+}
+
+@Benchmark
+public void testAddEntriesWithSnapshots() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+TimelineHashMap map =
+new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES);
+long epoch = 0;
+int j = 0;
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int)(0x & ((i * 2862933555777941757L) + 
3037000493L));
+if (j > 10 && key % 3 == 0) {

Review comment:
   Hmm, does the code achieve this? It seem that at least 2/3 of the 
iterations will trigger the creation of a new snapshot?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568259273



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.timeline;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 10)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+
+public class TimelineHashMapBenchmark {
+private final static int NUM_ENTRIES = 1_000_000;
+
+@Benchmark
+public void testAddEntriesInHashMap() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+HashMap map = new HashMap<>(NUM_ENTRIES);
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int)(0x & ((i * 2862933555777941757L) + 
3037000493L));

Review comment:
   Should we use randomly generated keys with a common seed?

##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.timeline;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 10)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+
+public class TimelineHashMapBenchmark {
+private final static int NUM_ENTRIES = 1_000_000;
+
+@Benchmark
+public void testAddEntriesInHashMap() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+HashMap map = new HashMap<>(NUM_ENTRIES);
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int)(0x & ((i * 2862933555777941757L) + 
3037000493L));
+map.put(key, String.valueOf(key));
+}
+}
+
+@Benchmark
+public void testAddEntriesInTimelineMap() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+TimelineHashMap map =
+new TimelineHashMap<>(snapshotRegistry, 

[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568213962



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the most 
recent
+ * snapshot tier.
+ *
+ * Note that there are no keys in SnapshottableHashTable, only values.  So it 
more similar
+ * to a hash set than a hash map.  The subclasses implement full-featured maps 
and sets
+ * using this class as a building block.
+ *
+ * Each snapshot tier contains a size and a hash table.  The size reflects the 
size at
+ * the time the snapshot was taken.  Note that, as an optimization, snapshot 
tiers will
+ * be null if they don't contain anything.  So for example, if snapshot 20 of 
Object O

Review comment:
   To be more precise, it's the snapshot tier's hashtable will be null, 
right?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568172947



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the most 
recent
+ * snapshot tier.
+ *
+ * Note that there are no keys in SnapshottableHashTable, only values.  So it 
more similar
+ * to a hash set than a hash map.  The subclasses implement full-featured maps 
and sets
+ * using this class as a building block.
+ *
+ * Each snapshot tier contains a size and a hash table.  The size reflects the 
size at
+ * the time the snapshot was taken.  Note that, as an optimization, snapshot 
tiers will
+ * be null if they don't contain anything.  So for example, if snapshot 20 of 
Object O
+ * contains the same entries as snapshot 10 of that object, the snapshot 20 
tier for

Review comment:
   It should be that the snapshot 10 tier will be null, right?

##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an 

[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568039786



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+class SnapshotIterator implements Iterator {

Review comment:
   Could we add a comment on the ordering for SnapshotIterator and 
ReverseSnapshotIterator? 

##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers

Review comment:
   "snapshot tiers" needs to be adjusted since we only push the deleted or 
overwritten into the largest previous epoch.

##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to 

[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-29 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r567181341



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate the
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+
+/**
+ * A special epoch value that represents the latest data.
+ */
+final static long LATEST_EPOCH = Long.MAX_VALUE;
+
+interface ElementWithStartEpoch {
+void setStartEpoch(long 

[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-29 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r567020932



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+class SnapshotIterator implements ListIterator {
+private Snapshot cur;
+private Snapshot lastResult = null;
+
+SnapshotIterator(Snapshot startAfter) {
+this.cur = startAfter;
+}
+
+@Override
+public boolean hasNext() {
+return cur.next() != head;
+}
+
+@Override
+public Snapshot next() {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
+cur = cur.next();
+lastResult = cur;
+return cur;
+}
+
+@Override
+public boolean hasPrevious() {
+return cur != head;
+}
+
+@Override
+public Snapshot previous() {
+if (!hasPrevious()) {
+throw new NoSuchElementException();
+}
+Snapshot result = cur;

Review comment:
   In that case, we should not return the snapshot at startAfter, right?

##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+class SnapshotIterator implements ListIterator {
+private Snapshot cur;
+private Snapshot lastResult = null;
+
+SnapshotIterator(Snapshot startAfter) {
+this.cur = startAfter;
+}
+
+@Override
+public boolean hasNext() {
+return cur.next() != head;
+}
+
+@Override
+public Snapshot next() {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
+cur = cur.next();
+lastResult = cur;
+return cur;
+}
+
+@Override
+public boolean hasPrevious() {
+return cur != head;
+}
+
+@Override
+public Snapshot previous() {
+if 

[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-29 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r567004415



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+class SnapshotIterator implements ListIterator {
+private Snapshot cur;
+private Snapshot lastResult = null;
+
+SnapshotIterator(Snapshot startAfter) {
+this.cur = startAfter;
+}
+
+@Override
+public boolean hasNext() {
+return cur.next() != head;
+}
+
+@Override
+public Snapshot next() {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
+cur = cur.next();
+lastResult = cur;
+return cur;
+}
+
+@Override
+public boolean hasPrevious() {
+return cur != head;
+}
+
+@Override
+public Snapshot previous() {
+if (!hasPrevious()) {
+throw new NoSuchElementException();
+}
+Snapshot result = cur;

Review comment:
   This is a bit weird with the input param startAfter, which suggests 
iterating all items after startAfter. With previous, we are returning items 
before and including startAfter.

##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+class SnapshotIterator implements ListIterator {
+private Snapshot cur;
+private Snapshot lastResult = null;
+
+SnapshotIterator(Snapshot startAfter) {
+this.cur = startAfter;
+}
+
+@Override
+public boolean hasNext() {
+return cur.next() != head;
+}
+
+@Override
+public Snapshot next() {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
+cur = cur.next();
+lastResult = cur;
+return cur;
+}
+
+@Override
+public boolean hasPrevious() {
+

[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566532815



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private 

[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566532467



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private 

[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566532372



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This is a hash map which can be snapshotted.
+ *
+ * See {@SnapshottableHashTable} for more details about the implementation.
+ *
+ * This class requires external synchronization.  Null keys and values are not 
supported.
+ *
+ * @paramThe key type of the set.
+ * @paramThe value type of the set.
+ */
+public class TimelineHashMap
+extends SnapshottableHashTable>
+implements Map {
+static class TimelineHashMapEntry
+implements SnapshottableHashTable.ElementWithStartEpoch, 
Map.Entry {
+private final K key;
+private final V value;
+private long startEpoch;
+
+TimelineHashMapEntry(K key, V value) {
+this.key = key;
+this.value = value;
+this.startEpoch = Long.MAX_VALUE;
+}
+
+@Override
+public K getKey() {
+return key;
+}
+
+@Override
+public V getValue() {
+return value;
+}
+
+@Override
+public V setValue(V value) {
+// This would be inefficient to support since we'd need a 
back-reference
+// to the enclosing map in each Entry object.  There would also be
+// complications if this entry object was sourced from a 
historical iterator;
+// we don't support modifying the past.  Since we don't really 
need this API,
+// let's just not support it.
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void setStartEpoch(long startEpoch) {
+this.startEpoch = startEpoch;
+}
+
+@Override
+public long startEpoch() {
+return startEpoch;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public boolean equals(Object o) {
+if (!(o instanceof TimelineHashMapEntry)) return false;
+TimelineHashMapEntry other = (TimelineHashMapEntry) o;
+return key.equals(other.key);
+}
+
+@Override
+public int hashCode() {
+return key.hashCode();
+}
+}
+
+public TimelineHashMap(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+super(snapshotRegistry, expectedSize);
+}
+
+@Override
+public int size() {
+return size(Long.MAX_VALUE);
+}
+
+public int size(long epoch) {
+return snapshottableSize(epoch);
+}
+
+@Override
+public boolean isEmpty() {
+return isEmpty(Long.MAX_VALUE);
+}
+
+public boolean isEmpty(long epoch) {
+return snapshottableSize(epoch) == 0;
+}
+
+@Override
+public boolean containsKey(Object key) {
+return containsKey(key, Long.MAX_VALUE);
+}
+
+public boolean containsKey(Object key, long epoch) {
+return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) 
!= null;
+}
+
+@Override
+public boolean containsValue(Object value) {
+Iterator> iter = entrySet().iterator();
+while (iter.hasNext()) {
+Entry e = iter.next();
+if (value.equals(e.getValue())) {
+return true;
+}
+}
+return false;
+}
+
+@Override
+public V get(Object key) {
+return get(key, Long.MAX_VALUE);
+}
+
+public V get(Object key, long epoch) {
+Entry entry =
+snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch);
+if (entry == null) {
+return null;
+}
+return entry.getValue();
+}
+
+@Override
+public V put(K key, V value) {
+Objects.requireNonNull(key);
+

[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-27 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r564872645



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte

Review comment:
   typo thte

##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by