[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-02 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568250754



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the most 
recent
+ * snapshot tier.
+ *
+ * Note that there are no keys in SnapshottableHashTable, only values.  So it 
more similar
+ * to a hash set than a hash map.  The subclasses implement full-featured maps 
and sets
+ * using this class as a building block.
+ *
+ * Each snapshot tier contains a size and a hash table.  The size reflects the 
size at
+ * the time the snapshot was taken.  Note that, as an optimization, snapshot 
tiers will
+ * be null if they don't contain anything.  So for example, if snapshot 20 of 
Object O

Review comment:
   1. The HashTier could be null
   2. or the HashTier could be non-null but the deltaTable could be null
   3. or the HashTier could be non-null with a non-null delta table
   
   You would end up in situation #2 if you deleted something but it was still 
present in a snapshot.  The HashTier would record the old size but not have any 
delta entries.

##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.timeline;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 10)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+
+public class TimelineHashMapBenchmark {
+private final static int NUM_ENTRIES = 1_000_000;
+
+@Benchmark
+public void testAddEntriesInHashMap() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+HashMap map = new HashMap<>(NUM_ENTRIES);
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int)(0x & ((i * 2862933555777941757L) + 
3037000493L));
+map.put(key, String.valueOf(key));
+}
+}
+
+@Benchmark
+public void testAddEntriesInTimelineMap() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+TimelineHashMap map =
+n

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568323142



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.timeline;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 10)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+
+public class TimelineHashMapBenchmark {
+private final static int NUM_ENTRIES = 1_000_000;
+
+@Benchmark
+public void testAddEntriesInHashMap() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+HashMap map = new HashMap<>(NUM_ENTRIES);
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int)(0x & ((i * 2862933555777941757L) + 
3037000493L));
+map.put(key, String.valueOf(key));
+}
+}
+
+@Benchmark
+public void testAddEntriesInTimelineMap() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+TimelineHashMap map =
+new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES);
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int)(0x & ((i * 2862933555777941757L) + 
3037000493L));
+map.put(key, String.valueOf(key));
+}
+}
+
+@Benchmark
+public void testAddEntriesWithSnapshots() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+TimelineHashMap map =
+new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES);
+long epoch = 0;
+int j = 0;
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int)(0x & ((i * 2862933555777941757L) + 
3037000493L));
+if (j > 10 && key % 3 == 0) {

Review comment:
   oh, sorry, I put the snapshot creation / deleting in the wrong branch of 
the "if"





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568263923



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.timeline;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 10)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+
+public class TimelineHashMapBenchmark {
+private final static int NUM_ENTRIES = 1_000_000;
+
+@Benchmark
+public void testAddEntriesInHashMap() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+HashMap map = new HashMap<>(NUM_ENTRIES);
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int)(0x & ((i * 2862933555777941757L) + 
3037000493L));

Review comment:
   I liked the idea of having it be deterministic but I guess performance 
results vary each time anyway... not sure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568263694



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.timeline;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 10)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+
+public class TimelineHashMapBenchmark {
+private final static int NUM_ENTRIES = 1_000_000;
+
+@Benchmark
+public void testAddEntriesInHashMap() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+HashMap map = new HashMap<>(NUM_ENTRIES);
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int)(0x & ((i * 2862933555777941757L) + 
3037000493L));
+map.put(key, String.valueOf(key));
+}
+}
+
+@Benchmark
+public void testAddEntriesInTimelineMap() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+TimelineHashMap map =
+new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES);
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int)(0x & ((i * 2862933555777941757L) + 
3037000493L));
+map.put(key, String.valueOf(key));
+}
+}
+
+@Benchmark
+public void testAddEntriesWithSnapshots() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+TimelineHashMap map =
+new TimelineHashMap<>(snapshotRegistry, NUM_ENTRIES);
+long epoch = 0;
+int j = 0;
+for (int i = 0; i < NUM_ENTRIES; i++) {
+int key = (int)(0x & ((i * 2862933555777941757L) + 
3037000493L));
+if (j > 10 && key % 3 == 0) {

Review comment:
   the intention is to have each snapshot have around 10-13 entries on 
average





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568250754



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the most 
recent
+ * snapshot tier.
+ *
+ * Note that there are no keys in SnapshottableHashTable, only values.  So it 
more similar
+ * to a hash set than a hash map.  The subclasses implement full-featured maps 
and sets
+ * using this class as a building block.
+ *
+ * Each snapshot tier contains a size and a hash table.  The size reflects the 
size at
+ * the time the snapshot was taken.  Note that, as an optimization, snapshot 
tiers will
+ * be null if they don't contain anything.  So for example, if snapshot 20 of 
Object O

Review comment:
   1. The HashTier could be null
   2. or the HashTier could be non-null but the deltaTable could be null
   3. or the HashTier could be non-null with a non-null delta table
   
   You would end up in situation #2 if you deleted something but it was still 
present in a snapshot.  The HashTier would record the old size but not have any 
delta entries.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568201202



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the most 
recent
+ * snapshot tier.
+ *
+ * Note that there are no keys in SnapshottableHashTable, only values.  So it 
more similar
+ * to a hash set than a hash map.  The subclasses implement full-featured maps 
and sets
+ * using this class as a building block.
+ *
+ * Each snapshot tier contains a size and a hash table.  The size reflects the 
size at
+ * the time the snapshot was taken.  Note that, as an optimization, snapshot 
tiers will
+ * be null if they don't contain anything.  So for example, if snapshot 20 of 
Object O
+ * contains the same entries as snapshot 10 of that object, the snapshot 20 
tier for

Review comment:
   oh, the assumption here was that 20 came after 10, but was a no-op for 
this data structure in particular (so was null)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568157113



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers

Review comment:
   good catch.  I updated the JavaDoc.

##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the

Review comment:
   updated





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568134210



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate the
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+
+/**
+ * A special epoch value that represents the latest data.
+ */
+final static long LATEST_EPOCH = Long.MAX_VALUE;
+
+interface ElementWithStartEpoch {
+void setStartEpoch(long st

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568030679



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+class SnapshotIterator implements Iterator {
+Snapshot cur;
+Snapshot result = null;
+
+SnapshotIterator(Snapshot start) {
+cur = start;
+}
+
+@Override
+public boolean hasNext() {
+return cur != head;
+}
+
+@Override
+public Snapshot next() {
+result = cur;
+cur = cur.next();
+return result;
+}
+
+@Override
+public void remove() {
+if (result == null) {
+throw new IllegalStateException();
+}
+deleteSnapshot(result);
+result = null;
+}
+}
+
+class ReverseSnapshotIterator implements Iterator {
+Snapshot cur;
+
+ReverseSnapshotIterator() {
+cur = head.prev();
+}
+
+@Override
+public boolean hasNext() {
+return cur != head;
+}
+
+@Override
+public Snapshot next() {
+Snapshot result = cur;
+cur = cur.prev();
+return result;
+}
+}
+
+private final Logger log;
+
+/**
+ * A map from snapshot epochs to snapshot data structures.
+ */
+private final HashMap snapshots = new HashMap<>();
+
+/**
+ * The head of a list of snapshots, sorted by epoch.
+ */
+private final Snapshot head = new Snapshot(Long.MIN_VALUE);
+
+public SnapshotRegistry(LogContext logContext) {
+this.log = logContext.logger(SnapshotRegistry.class);
+}
+
+/**
+ * Returns a snapshot iterator that iterates from the snapshots with the
+ * lowest epoch to those with the highest.
+ */
+public Iterator iterator() {
+return new SnapshotIterator(head.next());
+}
+
+/**
+ * Returns a snapshot iterator that iterates from the snapshots with the
+ * lowest epoch to those with the highest, starting at the snapshot with 
the
+ * given epoch.
+ */
+public Iterator iterator(long epoch) {
+return iterator(getSnapshot(epoch));
+}
+
+/**
+ * Returns a snapshot iterator that iterates from the snapshots with the
+ * lowest epoch to those with the highest, starting at the given snapshot.
+ */
+public Iterator iterator(Snapshot snapshot) {
+return new SnapshotIterator(snapshot);
+}
+
+/**
+ * Returns a reverse snapshot iterator that iterates from the snapshots 
with the
+ * highest epoch to those with the lowest.
+ */
+public Iterator reverseIterator() {
+return new ReverseSnapshotIterator();
+}
+
+/**
+ * Returns a sorted list of snapshot epochs.
+ */
+public List epochsList() {
+List result = new ArrayList<>();
+for (Iterator iterator = iterator(); iterator.hasNext(); ) {
+result.add(iterator.next().epoch());
+}
+return result;
+}
+
+/**
+ * Gets the snapshot for a specific epoch.
+ */
+public Snapshot getSnapshot(long epoch) {
+Snapshot snapshot = snapshots.get(epoch);
+if (snapshot == null) {
+throw new RuntimeException("No snapshot for epoch " + epoch + ". 
Snapshot " +
+"epochs are: " + epochsList().stream().map(e -> e.toString()).
+collect(Collectors.joining(", ")));
+}
+return sn

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568030332



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+class SnapshotIterator implements Iterator {
+Snapshot cur;
+Snapshot result = null;
+
+SnapshotIterator(Snapshot start) {
+cur = start;
+}
+
+@Override
+public boolean hasNext() {
+return cur != head;
+}
+
+@Override
+public Snapshot next() {
+result = cur;
+cur = cur.next();
+return result;
+}
+
+@Override
+public void remove() {
+if (result == null) {
+throw new IllegalStateException();
+}
+deleteSnapshot(result);
+result = null;
+}
+}
+
+class ReverseSnapshotIterator implements Iterator {
+Snapshot cur;
+
+ReverseSnapshotIterator() {
+cur = head.prev();
+}
+
+@Override
+public boolean hasNext() {
+return cur != head;
+}
+
+@Override
+public Snapshot next() {
+Snapshot result = cur;
+cur = cur.prev();
+return result;
+}
+}
+
+private final Logger log;
+
+/**
+ * A map from snapshot epochs to snapshot data structures.
+ */
+private final HashMap snapshots = new HashMap<>();
+
+/**
+ * The head of a list of snapshots, sorted by epoch.
+ */
+private final Snapshot head = new Snapshot(Long.MIN_VALUE);
+
+public SnapshotRegistry(LogContext logContext) {
+this.log = logContext.logger(SnapshotRegistry.class);
+}
+
+/**
+ * Returns a snapshot iterator that iterates from the snapshots with the
+ * lowest epoch to those with the highest.
+ */
+public Iterator iterator() {
+return new SnapshotIterator(head.next());
+}
+
+/**
+ * Returns a snapshot iterator that iterates from the snapshots with the
+ * lowest epoch to those with the highest, starting at the snapshot with 
the
+ * given epoch.
+ */
+public Iterator iterator(long epoch) {
+return iterator(getSnapshot(epoch));
+}
+
+/**
+ * Returns a snapshot iterator that iterates from the snapshots with the
+ * lowest epoch to those with the highest, starting at the given snapshot.
+ */
+public Iterator iterator(Snapshot snapshot) {
+return new SnapshotIterator(snapshot);
+}
+
+/**
+ * Returns a reverse snapshot iterator that iterates from the snapshots 
with the
+ * highest epoch to those with the lowest.
+ */
+public Iterator reverseIterator() {
+return new ReverseSnapshotIterator();
+}
+
+/**
+ * Returns a sorted list of snapshot epochs.
+ */
+public List epochsList() {
+List result = new ArrayList<>();
+for (Iterator iterator = iterator(); iterator.hasNext(); ) {
+result.add(iterator.next().epoch());
+}
+return result;
+}
+
+/**
+ * Gets the snapshot for a specific epoch.
+ */
+public Snapshot getSnapshot(long epoch) {
+Snapshot snapshot = snapshots.get(epoch);
+if (snapshot == null) {
+throw new RuntimeException("No snapshot for epoch " + epoch + ". 
Snapshot " +
+"epochs are: " + epochsList().stream().map(e -> e.toString()).
+collect(Collectors.joining(", ")));
+}
+return sn

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568029191



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate the
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+
+/**
+ * A special epoch value that represents the latest data.
+ */
+final static long LATEST_EPOCH = Long.MAX_VALUE;
+
+interface ElementWithStartEpoch {
+void setStartEpoch(long st

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568028493



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate the
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+
+/**
+ * A special epoch value that represents the latest data.
+ */
+final static long LATEST_EPOCH = Long.MAX_VALUE;
+
+interface ElementWithStartEpoch {
+void setStartEpoch(long st

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568028002



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate the
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+
+/**
+ * A special epoch value that represents the latest data.
+ */
+final static long LATEST_EPOCH = Long.MAX_VALUE;
+
+interface ElementWithStartEpoch {
+void setStartEpoch(long st

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-02-01 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r568027824



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate the
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+
+/**
+ * A special epoch value that represents the latest data.
+ */
+final static long LATEST_EPOCH = Long.MAX_VALUE;
+
+interface ElementWithStartEpoch {
+void setStartEpoch(long st

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-29 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r567034077



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+class SnapshotIterator implements ListIterator {
+private Snapshot cur;
+private Snapshot lastResult = null;
+
+SnapshotIterator(Snapshot startAfter) {
+this.cur = startAfter;
+}
+
+@Override
+public boolean hasNext() {
+return cur.next() != head;
+}
+
+@Override
+public Snapshot next() {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
+cur = cur.next();
+lastResult = cur;
+return cur;
+}
+
+@Override
+public boolean hasPrevious() {
+return cur != head;
+}
+
+@Override
+public Snapshot previous() {
+if (!hasPrevious()) {
+throw new NoSuchElementException();
+}
+Snapshot result = cur;

Review comment:
   Hmm, I'm not sure I follow the question.  I might be missing something.
   
   startAfter just means start after X.  So if your list is A, B, C, D and you 
pass startAfter = B the first call to next() will return C.
   
   If, instead, you choose to iterate backwards using previous() then of course 
you will get B and then A.  But you still "started after" B, you just chose to 
go backwards.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-29 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r567032323



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+class SnapshotIterator implements ListIterator {
+private Snapshot cur;
+private Snapshot lastResult = null;
+
+SnapshotIterator(Snapshot startAfter) {
+this.cur = startAfter;
+}
+
+@Override
+public boolean hasNext() {
+return cur.next() != head;
+}
+
+@Override
+public Snapshot next() {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
+cur = cur.next();
+lastResult = cur;
+return cur;
+}
+
+@Override
+public boolean hasPrevious() {
+return cur != head;
+}
+
+@Override
+public Snapshot previous() {
+if (!hasPrevious()) {
+throw new NoSuchElementException();
+}
+Snapshot result = cur;
+cur = cur.prev();
+lastResult = result;

Review comment:
   ListIterator#remove is kind of annoying to implement because it can 
delete things that were returned by next() OR by previous().
   
   Imagine that we have the list 1, 2, 3 and the user called next() and got 
back 2.  Then the user calls remove() and removes 2.  A subsequent call to next 
must return 3.  That means cur needs to end up as 1 after remove() is called.
   
   Then imagine a different scenario, where the user called previous() and got 
back 2.  Then the user calls remove() and removes 2.  A subsequent call to 
previous must return 1.  That means cur needs to end up as 3 after remove() is 
called.
   
   So it is not sufficient to just look at cur when implementing remove() and 
always do it the same way.  That's why lastResult exists.  (Another reason 
lastResult exists is to enforce the invariant that next or previous must be 
called before any delete)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-29 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r567028392



##
File path: metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This is a hash map which can be snapshotted.
+ *
+ * See {@SnapshottableHashTable} for more details about the implementation.
+ *
+ * This class requires external synchronization.  Null keys and values are not 
supported.
+ *
+ * @paramThe key type of the set.
+ * @paramThe value type of the set.
+ */
+public class TimelineHashMap
+extends SnapshottableHashTable>
+implements Map {
+static class TimelineHashMapEntry
+implements SnapshottableHashTable.ElementWithStartEpoch, 
Map.Entry {
+private final K key;
+private final V value;
+private long startEpoch;
+
+TimelineHashMapEntry(K key, V value) {
+this.key = key;
+this.value = value;
+this.startEpoch = SnapshottableHashTable.LATEST_EPOCH;
+}
+
+@Override
+public K getKey() {
+return key;
+}
+
+@Override
+public V getValue() {
+return value;
+}
+
+@Override
+public V setValue(V value) {
+// This would be inefficient to support since we'd need a 
back-reference
+// to the enclosing map in each Entry object.  There would also be
+// complications if this entry object was sourced from a 
historical iterator;
+// we don't support modifying the past.  Since we don't really 
need this API,
+// let's just not support it.
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void setStartEpoch(long startEpoch) {
+this.startEpoch = startEpoch;
+}
+
+@Override
+public long startEpoch() {
+return startEpoch;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public boolean equals(Object o) {
+if (!(o instanceof TimelineHashMapEntry)) return false;
+TimelineHashMapEntry other = (TimelineHashMapEntry) o;
+return key.equals(other.key);
+}
+
+@Override
+public int hashCode() {
+return key.hashCode();
+}
+}
+
+public TimelineHashMap(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+super(snapshotRegistry, expectedSize);
+}
+
+@Override
+public int size() {
+return size(SnapshottableHashTable.LATEST_EPOCH);
+}
+
+public int size(long epoch) {
+return snapshottableSize(epoch);
+}
+
+@Override
+public boolean isEmpty() {
+return isEmpty(SnapshottableHashTable.LATEST_EPOCH);
+}
+
+public boolean isEmpty(long epoch) {
+return snapshottableSize(epoch) == 0;
+}
+
+@Override
+public boolean containsKey(Object key) {
+return containsKey(key, SnapshottableHashTable.LATEST_EPOCH);
+}
+
+public boolean containsKey(Object key, long epoch) {
+return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) 
!= null;
+}
+
+@Override
+public boolean containsValue(Object value) {
+Iterator> iter = entrySet().iterator();
+while (iter.hasNext()) {
+Entry e = iter.next();
+if (value.equals(e.getValue())) {
+return true;
+}
+}
+return false;
+}
+
+@Override
+public V get(Object key) {
+return get(key, SnapshottableHashTable.LATEST_EPOCH);
+}
+
+public V get(Object key, long epoch) {
+Entry entry =
+snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch);
+if (entry == null) {
+return null;
+}
+return entry.getValue();
+}

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-29 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r567027495



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private Base

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-29 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r567014747



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+class SnapshotIterator implements ListIterator {
+private Snapshot cur;
+private Snapshot lastResult = null;
+
+SnapshotIterator(Snapshot startAfter) {
+this.cur = startAfter;
+}
+
+@Override
+public boolean hasNext() {
+return cur.next() != head;
+}
+
+@Override
+public Snapshot next() {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
+cur = cur.next();
+lastResult = cur;
+return cur;
+}
+
+@Override
+public boolean hasPrevious() {
+return cur != head;
+}
+
+@Override
+public Snapshot previous() {
+if (!hasPrevious()) {
+throw new NoSuchElementException();
+}
+Snapshot result = cur;

Review comment:
   ListIterator supports going backwards.  startAfter just indicates that 
the iteration starts after the given point





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-29 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r567014310



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+class SnapshotIterator implements ListIterator {
+private Snapshot cur;
+private Snapshot lastResult = null;
+
+SnapshotIterator(Snapshot startAfter) {
+this.cur = startAfter;
+}
+
+@Override
+public boolean hasNext() {
+return cur.next() != head;
+}
+
+@Override
+public Snapshot next() {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
+cur = cur.next();
+lastResult = cur;
+return cur;
+}
+
+@Override
+public boolean hasPrevious() {
+return cur != head;
+}
+
+@Override
+public Snapshot previous() {
+if (!hasPrevious()) {
+throw new NoSuchElementException();
+}
+Snapshot result = cur;
+cur = cur.prev();
+lastResult = result;
+return result;
+}
+
+@Override
+public int nextIndex() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public int previousIndex() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void remove() {
+if (lastResult == null) {
+throw new IllegalStateException();
+}
+if (cur == lastResult) {
+cur = cur.prev();
+}
+snapshots.remove(lastResult.epoch());
+lastResult.removeSelfFromList();
+lastResult = null;
+}
+
+@Override
+public void set(Snapshot snapshot) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void add(Snapshot snapshot) {
+if (cur.epoch() >= snapshot.epoch()) {
+throw new IllegalArgumentException("Can't add snapshot " +
+snapshot.epoch() + " after snapshot " + cur.epoch());
+}
+cur.add(snapshot);
+cur = snapshot;
+}
+}
+
+private final Logger log;
+
+/**
+ * A map from snapshot epochs to snapshot data structures.
+ */
+private final HashMap snapshots = new HashMap<>();
+
+/**
+ * The head of a list of snapshots, sorted by epoch.
+ */
+private final Snapshot head = new Snapshot(-1);
+
+public SnapshotRegistry(LogContext logContext) {
+this.log = logContext.logger(SnapshotRegistry.class);
+}
+
+/**
+ * Returns an iterator that moves through snapshots from the lowest to the 
highest epoch.
+ */
+public ListIterator iterator() {
+return new SnapshotIterator(head);
+}
+
+/**
+ * Returns an iterator that moves through snapshots from the lowest to the 
highest epoch.
+ *
+ * @param startAfterA snapshot that we should start after.
+ */
+ListIterator iterator(Snapshot startAfter) {
+return new SnapshotIterator(startAfter);
+}
+
+/**
+ * Returns a sorted list of snapshot epochs.
+ */
+List epochsList() {
+List result = new ArrayList<>();
+for (ListIterator iterator = iterator(); iterator.hasNext(); 
) {
+result.add(iterator.next().e

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566481305



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashSet.java
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This is a hash set which can be snapshotted.
+ *
+ * See {@SnapshottableHashTable} for more details about the implementation.
+ *
+ * This class requires external synchronization.  Null values are not 
supported.
+ *
+ * @paramThe value type of the set.
+ */
+public class TimelineHashSet
+extends SnapshottableHashTable>
+implements Set {
+static class TimelineHashSetEntry
+implements SnapshottableHashTable.ElementWithStartEpoch {
+private final T value;
+private long startEpoch;
+
+TimelineHashSetEntry(T value) {
+this.value = value;
+this.startEpoch = Long.MAX_VALUE;
+}
+
+public T getValue() {
+return value;
+}
+
+@Override
+public void setStartEpoch(long startEpoch) {
+this.startEpoch = startEpoch;
+}
+
+@Override
+public long startEpoch() {
+return startEpoch;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public boolean equals(Object o) {
+if (!(o instanceof TimelineHashSetEntry)) return false;
+TimelineHashSetEntry other = (TimelineHashSetEntry) o;
+return value.equals(other.value);
+}
+
+@Override
+public int hashCode() {
+return value.hashCode();
+}
+}
+
+public TimelineHashSet(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+super(snapshotRegistry, expectedSize);
+}
+
+@Override
+public int size() {
+return size(Long.MAX_VALUE);
+}
+
+public int size(long epoch) {
+return snapshottableSize(epoch);
+}
+
+@Override
+public boolean isEmpty() {
+return isEmpty(Long.MAX_VALUE);
+}
+
+public boolean isEmpty(long epoch) {
+return snapshottableSize(epoch) == 0;
+}
+
+@Override
+public boolean contains(Object key) {
+return contains(key, Long.MAX_VALUE);
+}
+
+public boolean contains(Object object, long epoch) {
+return snapshottableGet(new TimelineHashSetEntry<>(object), epoch) != 
null;
+}
+
+final class ValueIterator implements Iterator {
+private final Iterator> iter;
+
+ValueIterator(long epoch) {
+this.iter = snapshottableIterator(epoch);
+}
+
+@Override
+public boolean hasNext() {
+return iter.hasNext();
+}
+
+@Override
+public T next() {
+return iter.next().value;
+}
+
+@Override
+public void remove() {
+iter.remove();
+}
+}
+
+@Override
+public Iterator iterator() {
+return iterator(Long.MAX_VALUE);
+}
+
+public Iterator iterator(long epoch) {
+return new ValueIterator(epoch);
+}
+
+@Override
+public Object[] toArray() {
+Object[] result = new Object[size()];
+Iterator iter = iterator();
+int i = 0;
+while (iter.hasNext()) {
+result[i++] = iter.next();
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public  R[] toArray(R[] a) {
+int size = size();
+if (size <= a.length) {
+Iterator iter = iterator();
+int i = 0;
+while (iter.hasNext()) {
+a[i++] = (R) iter.next();
+}
+while (i < a.length) {
+a[i++] = null;
+}
+return a;
+} else {
+return (R[]) toArray();
+}
+}
+
+@Override
+public boolean add(T newValue) {
+Objects.requireNonNull(newValue);
+return snapshottableAddUnlessPresent(new 
TimelineHa

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566481190



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This is a hash map which can be snapshotted.
+ *
+ * See {@SnapshottableHashTable} for more details about the implementation.
+ *
+ * This class requires external synchronization.  Null keys and values are not 
supported.
+ *
+ * @paramThe key type of the set.
+ * @paramThe value type of the set.
+ */
+public class TimelineHashMap
+extends SnapshottableHashTable>
+implements Map {
+static class TimelineHashMapEntry
+implements SnapshottableHashTable.ElementWithStartEpoch, 
Map.Entry {
+private final K key;
+private final V value;
+private long startEpoch;
+
+TimelineHashMapEntry(K key, V value) {
+this.key = key;
+this.value = value;
+this.startEpoch = Long.MAX_VALUE;
+}
+
+@Override
+public K getKey() {
+return key;
+}
+
+@Override
+public V getValue() {
+return value;
+}
+
+@Override
+public V setValue(V value) {
+// This would be inefficient to support since we'd need a 
back-reference
+// to the enclosing map in each Entry object.  There would also be
+// complications if this entry object was sourced from a 
historical iterator;
+// we don't support modifying the past.  Since we don't really 
need this API,
+// let's just not support it.
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void setStartEpoch(long startEpoch) {
+this.startEpoch = startEpoch;
+}
+
+@Override
+public long startEpoch() {
+return startEpoch;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public boolean equals(Object o) {
+if (!(o instanceof TimelineHashMapEntry)) return false;
+TimelineHashMapEntry other = (TimelineHashMapEntry) o;
+return key.equals(other.key);
+}
+
+@Override
+public int hashCode() {
+return key.hashCode();
+}
+}
+
+public TimelineHashMap(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+super(snapshotRegistry, expectedSize);
+}
+
+@Override
+public int size() {
+return size(Long.MAX_VALUE);
+}
+
+public int size(long epoch) {
+return snapshottableSize(epoch);
+}
+
+@Override
+public boolean isEmpty() {
+return isEmpty(Long.MAX_VALUE);
+}
+
+public boolean isEmpty(long epoch) {
+return snapshottableSize(epoch) == 0;
+}
+
+@Override
+public boolean containsKey(Object key) {
+return containsKey(key, Long.MAX_VALUE);
+}
+
+public boolean containsKey(Object key, long epoch) {
+return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) 
!= null;
+}
+
+@Override
+public boolean containsValue(Object value) {
+Iterator> iter = entrySet().iterator();
+while (iter.hasNext()) {
+Entry e = iter.next();
+if (value.equals(e.getValue())) {
+return true;
+}
+}
+return false;
+}
+
+@Override
+public V get(Object key) {
+return get(key, Long.MAX_VALUE);
+}
+
+public V get(Object key, long epoch) {
+Entry entry =
+snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch);
+if (entry == null) {
+return null;
+}
+return entry.getValue();
+}
+
+@Override
+public V put(K key, V value) {
+Objects.requireNonNull(key);
+   

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566481017



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private Base

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566469662



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This is a hash map which can be snapshotted.
+ *
+ * See {@SnapshottableHashTable} for more details about the implementation.
+ *
+ * This class requires external synchronization.  Null keys and values are not 
supported.
+ *
+ * @paramThe key type of the set.
+ * @paramThe value type of the set.
+ */
+public class TimelineHashMap
+extends SnapshottableHashTable>
+implements Map {
+static class TimelineHashMapEntry
+implements SnapshottableHashTable.ElementWithStartEpoch, 
Map.Entry {
+private final K key;
+private final V value;
+private long startEpoch;
+
+TimelineHashMapEntry(K key, V value) {
+this.key = key;
+this.value = value;
+this.startEpoch = Long.MAX_VALUE;
+}
+
+@Override
+public K getKey() {
+return key;
+}
+
+@Override
+public V getValue() {
+return value;
+}
+
+@Override
+public V setValue(V value) {
+// This would be inefficient to support since we'd need a 
back-reference
+// to the enclosing map in each Entry object.  There would also be
+// complications if this entry object was sourced from a 
historical iterator;
+// we don't support modifying the past.  Since we don't really 
need this API,
+// let's just not support it.
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void setStartEpoch(long startEpoch) {
+this.startEpoch = startEpoch;
+}
+
+@Override
+public long startEpoch() {
+return startEpoch;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public boolean equals(Object o) {
+if (!(o instanceof TimelineHashMapEntry)) return false;
+TimelineHashMapEntry other = (TimelineHashMapEntry) o;
+return key.equals(other.key);
+}
+
+@Override
+public int hashCode() {
+return key.hashCode();
+}
+}
+
+public TimelineHashMap(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+super(snapshotRegistry, expectedSize);
+}
+
+@Override
+public int size() {
+return size(Long.MAX_VALUE);
+}
+
+public int size(long epoch) {
+return snapshottableSize(epoch);
+}
+
+@Override
+public boolean isEmpty() {
+return isEmpty(Long.MAX_VALUE);
+}
+
+public boolean isEmpty(long epoch) {
+return snapshottableSize(epoch) == 0;
+}
+
+@Override
+public boolean containsKey(Object key) {
+return containsKey(key, Long.MAX_VALUE);
+}
+
+public boolean containsKey(Object key, long epoch) {
+return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) 
!= null;
+}
+
+@Override
+public boolean containsValue(Object value) {
+Iterator> iter = entrySet().iterator();
+while (iter.hasNext()) {
+Entry e = iter.next();
+if (value.equals(e.getValue())) {
+return true;
+}
+}
+return false;
+}
+
+@Override
+public V get(Object key) {
+return get(key, Long.MAX_VALUE);
+}
+
+public V get(Object key, long epoch) {
+Entry entry =
+snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch);
+if (entry == null) {
+return null;
+}
+return entry.getValue();
+}
+
+@Override
+public V put(K key, V value) {
+Objects.requireNonNull(key);
+   

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566469311



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This is a hash map which can be snapshotted.
+ *
+ * See {@SnapshottableHashTable} for more details about the implementation.
+ *
+ * This class requires external synchronization.  Null keys and values are not 
supported.
+ *
+ * @paramThe key type of the set.
+ * @paramThe value type of the set.
+ */
+public class TimelineHashMap
+extends SnapshottableHashTable>
+implements Map {
+static class TimelineHashMapEntry
+implements SnapshottableHashTable.ElementWithStartEpoch, 
Map.Entry {
+private final K key;
+private final V value;
+private long startEpoch;
+
+TimelineHashMapEntry(K key, V value) {
+this.key = key;
+this.value = value;
+this.startEpoch = Long.MAX_VALUE;
+}
+
+@Override
+public K getKey() {
+return key;
+}
+
+@Override
+public V getValue() {
+return value;
+}
+
+@Override
+public V setValue(V value) {
+// This would be inefficient to support since we'd need a 
back-reference
+// to the enclosing map in each Entry object.  There would also be
+// complications if this entry object was sourced from a 
historical iterator;
+// we don't support modifying the past.  Since we don't really 
need this API,
+// let's just not support it.
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void setStartEpoch(long startEpoch) {
+this.startEpoch = startEpoch;
+}
+
+@Override
+public long startEpoch() {
+return startEpoch;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public boolean equals(Object o) {
+if (!(o instanceof TimelineHashMapEntry)) return false;
+TimelineHashMapEntry other = (TimelineHashMapEntry) o;
+return key.equals(other.key);
+}
+
+@Override
+public int hashCode() {
+return key.hashCode();
+}
+}
+
+public TimelineHashMap(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+super(snapshotRegistry, expectedSize);
+}
+
+@Override
+public int size() {
+return size(Long.MAX_VALUE);
+}
+
+public int size(long epoch) {
+return snapshottableSize(epoch);
+}
+
+@Override
+public boolean isEmpty() {
+return isEmpty(Long.MAX_VALUE);
+}
+
+public boolean isEmpty(long epoch) {
+return snapshottableSize(epoch) == 0;
+}
+
+@Override
+public boolean containsKey(Object key) {
+return containsKey(key, Long.MAX_VALUE);
+}
+
+public boolean containsKey(Object key, long epoch) {
+return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) 
!= null;
+}
+
+@Override
+public boolean containsValue(Object value) {
+Iterator> iter = entrySet().iterator();
+while (iter.hasNext()) {
+Entry e = iter.next();
+if (value.equals(e.getValue())) {
+return true;
+}
+}
+return false;
+}
+
+@Override
+public V get(Object key) {
+return get(key, Long.MAX_VALUE);
+}
+
+public V get(Object key, long epoch) {
+Entry entry =
+snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch);
+if (entry == null) {
+return null;
+}
+return entry.getValue();
+}
+
+@Override
+public V put(K key, V value) {
+Objects.requireNonNull(key);
+   

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566468092



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+private final Logger log;
+
+/**
+ * The current epoch.  All snapshot epochs are lower than this number.
+ */
+private long curEpoch;
+
+/**
+ * An ArrayList of snapshots, kept in sorted order.
+ */
+private final ArrayList snapshots;
+
+public SnapshotRegistry(long startEpoch) {
+this(new LogContext(), startEpoch);
+}
+
+public SnapshotRegistry(LogContext logContext, long startEpoch) {
+this.log = logContext.logger(SnapshotRegistry.class);
+this.curEpoch = startEpoch;
+this.snapshots = new ArrayList<>(5);
+}
+
+/**
+ * Returns an iterator that moves through snapshots from the lowest to the 
highest epoch.
+ */
+public Iterator snapshots() {
+return snapshots.iterator();
+}
+
+/**
+ * Gets the snapshot for a specific epoch.
+ */
+public Snapshot get(long epoch) {
+for (Snapshot snapshot : snapshots) {
+if (snapshot.epoch() == epoch) {
+return snapshot;
+}
+}
+throw new RuntimeException("No snapshot for epoch " + epoch);
+}
+
+/**
+ * Creates a new snapshot at the given epoch.
+ *
+ * @param epoch The epoch to create the snapshot at.  The 
current epoch
+ *  will be advanced to one past this epoch.
+ */
+public Snapshot createSnapshot(long epoch) {
+if (epoch < curEpoch) {
+throw new RuntimeException("Can't create a new snapshot at epoch " 
+ epoch +
+" because the current epoch is " + curEpoch);
+}
+Snapshot snapshot = new Snapshot(epoch);
+snapshots.add(snapshot);
+curEpoch = epoch + 1;
+log.debug("Creating snapshot {}", epoch);
+return snapshot;
+}
+
+/**
+ * Deletes the snapshot with the given epoch.
+ *
+ * @param epoch The epoch of the snapshot to delete.
+ */
+public void deleteSnapshot(long epoch) {
+Iterator iter = snapshots.iterator();
+while (iter.hasNext()) {
+Snapshot snapshot = iter.next();
+if (snapshot.epoch() == epoch) {
+log.debug("Deleting snapshot {}", epoch);
+iter.remove();
+return;
+}
+}
+throw new RuntimeException(String.format(
+"No snapshot at epoch %d found. Snapshot epochs are %s.", epoch,
+snapshots.stream().map(snapshot -> 
String.valueOf(snapshot.epoch())).
+collect(Collectors.joining(", ";
+}
+
+/**
+ * Reverts the state of all data structures to the state at the given 
epoch.
+ *
+ * @param epoch The epoch of the snapshot to revert to.
+ */
+public void revertToSnapshot(long epoch) {

Review comment:
   It does do that.  I'll add a comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566468092



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+private final Logger log;
+
+/**
+ * The current epoch.  All snapshot epochs are lower than this number.
+ */
+private long curEpoch;
+
+/**
+ * An ArrayList of snapshots, kept in sorted order.
+ */
+private final ArrayList snapshots;
+
+public SnapshotRegistry(long startEpoch) {
+this(new LogContext(), startEpoch);
+}
+
+public SnapshotRegistry(LogContext logContext, long startEpoch) {
+this.log = logContext.logger(SnapshotRegistry.class);
+this.curEpoch = startEpoch;
+this.snapshots = new ArrayList<>(5);
+}
+
+/**
+ * Returns an iterator that moves through snapshots from the lowest to the 
highest epoch.
+ */
+public Iterator snapshots() {
+return snapshots.iterator();
+}
+
+/**
+ * Gets the snapshot for a specific epoch.
+ */
+public Snapshot get(long epoch) {
+for (Snapshot snapshot : snapshots) {
+if (snapshot.epoch() == epoch) {
+return snapshot;
+}
+}
+throw new RuntimeException("No snapshot for epoch " + epoch);
+}
+
+/**
+ * Creates a new snapshot at the given epoch.
+ *
+ * @param epoch The epoch to create the snapshot at.  The 
current epoch
+ *  will be advanced to one past this epoch.
+ */
+public Snapshot createSnapshot(long epoch) {
+if (epoch < curEpoch) {
+throw new RuntimeException("Can't create a new snapshot at epoch " 
+ epoch +
+" because the current epoch is " + curEpoch);
+}
+Snapshot snapshot = new Snapshot(epoch);
+snapshots.add(snapshot);
+curEpoch = epoch + 1;
+log.debug("Creating snapshot {}", epoch);
+return snapshot;
+}
+
+/**
+ * Deletes the snapshot with the given epoch.
+ *
+ * @param epoch The epoch of the snapshot to delete.
+ */
+public void deleteSnapshot(long epoch) {
+Iterator iter = snapshots.iterator();
+while (iter.hasNext()) {
+Snapshot snapshot = iter.next();
+if (snapshot.epoch() == epoch) {
+log.debug("Deleting snapshot {}", epoch);
+iter.remove();
+return;
+}
+}
+throw new RuntimeException(String.format(
+"No snapshot at epoch %d found. Snapshot epochs are %s.", epoch,
+snapshots.stream().map(snapshot -> 
String.valueOf(snapshot.epoch())).
+collect(Collectors.joining(", ";
+}
+
+/**
+ * Reverts the state of all data structures to the state at the given 
epoch.
+ *
+ * @param epoch The epoch of the snapshot to revert to.
+ */
+public void revertToSnapshot(long epoch) {

Review comment:
   It does do that.  I'll add a line to the JavaDoc.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566467584



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private Base

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566457016



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private Base

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566454521



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private Base

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566452138



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private Base

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566451017



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+private final Logger log;
+
+/**
+ * The current epoch.  All snapshot epochs are lower than this number.
+ */
+private long curEpoch;
+
+/**
+ * An ArrayList of snapshots, kept in sorted order.
+ */
+private final ArrayList snapshots;
+
+public SnapshotRegistry(long startEpoch) {
+this(new LogContext(), startEpoch);
+}
+
+public SnapshotRegistry(LogContext logContext, long startEpoch) {
+this.log = logContext.logger(SnapshotRegistry.class);
+this.curEpoch = startEpoch;
+this.snapshots = new ArrayList<>(5);
+}
+
+/**
+ * Returns an iterator that moves through snapshots from the lowest to the 
highest epoch.
+ */
+public Iterator snapshots() {
+return snapshots.iterator();
+}
+
+/**
+ * Gets the snapshot for a specific epoch.
+ */
+public Snapshot get(long epoch) {
+for (Snapshot snapshot : snapshots) {
+if (snapshot.epoch() == epoch) {
+return snapshot;
+}
+}
+throw new RuntimeException("No snapshot for epoch " + epoch);
+}
+
+/**
+ * Creates a new snapshot at the given epoch.
+ *
+ * @param epoch The epoch to create the snapshot at.  The 
current epoch
+ *  will be advanced to one past this epoch.
+ */
+public Snapshot createSnapshot(long epoch) {

Review comment:
   We want the epoch to be equal to the offset in the metadata log.  So, it 
needs to be provided externally.
   
   (For example, a snapshot at epoch 123 would have the state of the metadata 
data structures after replaying all the metadata records from 0 to 123.)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566450389



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private Base