This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch iqv2-framework in repository https://gitbox.apache.org/repos/asf/kafka.git
commit da055cc712646d32f02d5bf3ea0305a649cf4151 Author: John Roesler <[email protected]> AuthorDate: Tue Nov 23 16:49:52 2021 -0600 formalize Position --- .../org/apache/kafka/streams/query/Position.java | 120 +++--------- .../apache/kafka/streams/query/PositionTest.java | 215 +++++++++++++++++++++ 2 files changed, 243 insertions(+), 92 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java b/streams/src/main/java/org/apache/kafka/streams/query/Position.java index 5b0e981..4c613df 100644 --- a/streams/src/main/java/org/apache/kafka/streams/query/Position.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java @@ -17,6 +17,8 @@ package org.apache.kafka.streams.query; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; + import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -26,38 +28,45 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +@Evolving public class Position { + private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position; - private final Map<String, Map<Integer, Long>> position; - - private Position(final Map<String, Map<Integer, Long>> position) { + private Position(final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position) { this.position = position; } public static Position emptyPosition() { - return new Position(new HashMap<>()); + return new Position(new ConcurrentHashMap<>()); } - public static Position fromMap(final Map<String, Map<Integer, Long>> map) { + public static Position fromMap(final Map<String, ? extends Map<Integer, Long>> map) { return new Position(deepCopy(map)); } public Position withComponent(final String topic, final int partition, final long offset) { - final Map<String, Map<Integer, Long>> updated = deepCopy(position); - updated.computeIfAbsent(topic, k -> new HashMap<>()).put(partition, offset); - return new Position(updated); + position + .computeIfAbsent(topic, k -> new ConcurrentHashMap<>()) + .put(partition, offset); + return this; + } + + public Position copy() { + return new Position(deepCopy(position)); } public Position merge(final Position other) { if (other == null) { return this; } else { - final Map<String, Map<Integer, Long>> copy = deepCopy(position); - for (final Entry<String, Map<Integer, Long>> entry : other.position.entrySet()) { + final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> copy = + deepCopy(position); + for (final Entry<String, ConcurrentHashMap<Integer, Long>> entry : other.position.entrySet()) { final String topic = entry.getKey(); final Map<Integer, Long> partitionMap = - copy.computeIfAbsent(topic, k -> new HashMap<>()); + copy.computeIfAbsent(topic, k -> new ConcurrentHashMap<>()); for (final Entry<Integer, Long> partitionOffset : entry.getValue().entrySet()) { final Integer partition = partitionOffset.getKey(); final Long offset = partitionOffset.getValue(); @@ -79,88 +88,15 @@ public class Position { return Collections.unmodifiableMap(position.get(topic)); } - public ByteBuffer serialize() { - final byte version = (byte) 0; - - int arraySize = Byte.SIZE; // version - - final int nTopics = position.size(); - arraySize += Integer.SIZE; - - final ArrayList<Entry<String, Map<Integer, Long>>> entries = - new ArrayList<>(position.entrySet()); - final byte[][] topics = new byte[entries.size()][]; - - for (int i = 0; i < nTopics; i++) { - final Entry<String, Map<Integer, Long>> entry = entries.get(i); - final byte[] topicBytes = entry.getKey().getBytes(StandardCharsets.UTF_8); - topics[i] = topicBytes; - arraySize += Integer.SIZE; // topic name length - arraySize += topicBytes.length; // topic name itself - - final Map<Integer, Long> partitionOffsets = entry.getValue(); - arraySize += Integer.SIZE; // Number of PartitionOffset pairs - arraySize += (Integer.SIZE + Long.SIZE) - * partitionOffsets.size(); // partitionOffsets themselves - } - - final ByteBuffer buffer = ByteBuffer.allocate(arraySize); - buffer.put(version); - - buffer.putInt(nTopics); - for (int i = 0; i < nTopics; i++) { - buffer.putInt(topics[i].length); - buffer.put(topics[i]); - - final Entry<String, Map<Integer, Long>> entry = entries.get(i); - final Map<Integer, Long> partitionOffsets = entry.getValue(); - buffer.putInt(partitionOffsets.size()); - for (final Entry<Integer, Long> partitionOffset : partitionOffsets.entrySet()) { - buffer.putInt(partitionOffset.getKey()); - buffer.putLong(partitionOffset.getValue()); - } - } - - buffer.flip(); - return buffer; - } - - public static Position deserialize(final ByteBuffer buffer) { - final byte version = buffer.get(); - - switch (version) { - case (byte) 0: - final int nTopics = buffer.getInt(); - final Map<String, Map<Integer, Long>> position = new HashMap<>(nTopics); - for (int i = 0; i < nTopics; i++) { - final int topicNameLength = buffer.getInt(); - final byte[] topicNameBytes = new byte[topicNameLength]; - buffer.get(topicNameBytes); - final String topic = new String(topicNameBytes, StandardCharsets.UTF_8); - - final int numPairs = buffer.getInt(); - final Map<Integer, Long> partitionOffsets = new HashMap<>(numPairs); - for (int j = 0; j < numPairs; j++) { - partitionOffsets.put(buffer.getInt(), buffer.getLong()); - } - position.put(topic, partitionOffsets); - } - return Position.fromMap(position); - default: - throw new IllegalArgumentException( - "Unknown version " + version + " when deserializing Position" - ); - } - } - - private static Map<String, Map<Integer, Long>> deepCopy( - final Map<String, Map<Integer, Long>> map) { + private static ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> deepCopy( + final Map<String, ? extends Map<Integer, Long>> map) { if (map == null) { - return new HashMap<>(); + return new ConcurrentHashMap<>(); } else { - final Map<String, Map<Integer, Long>> copy = new HashMap<>(map.size()); - for (final Entry<String, Map<Integer, Long>> entry : map.entrySet()) { - copy.put(entry.getKey(), new HashMap<>(entry.getValue())); + final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> copy = + new ConcurrentHashMap<>(map.size()); + for (final Entry<String, ? extends Map<Integer, Long>> entry : map.entrySet()) { + copy.put(entry.getKey(), new ConcurrentHashMap<>(entry.getValue())); } return copy; } @@ -187,6 +123,6 @@ public class Position { @Override public int hashCode() { - return Objects.hash(position); + throw new UnsupportedOperationException("This mutable object is not suitable as a hash key"); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java new file mode 100644 index 0000000..0a0cf5c --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.query; + +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; + +public class PositionTest { + + @Test + public void shouldCreateFromMap() { + final Map<String, Map<Integer, Long>> map = mkMap( + mkEntry("topic", mkMap(mkEntry(0, 5L))), + mkEntry("topic1", mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L) + )) + ); + + final Position position = Position.fromMap(map); + assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1"))); + assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L)))); + + // Should be a copy of the constructor map + + map.get("topic1").put(99, 99L); + + // so the position is still the original one + assertThat(position.getBound("topic1"), equalTo(mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L) + ))); + } + + @Test + public void shouldCreateFromNullMap() { + final Position position = Position.fromMap(null); + assertThat(position.getTopics(), equalTo(Collections.emptySet())); + } + + @Test + public void shouldMerge() { + final Position position = Position.fromMap(mkMap( + mkEntry("topic", mkMap(mkEntry(0, 5L))), + mkEntry("topic1", mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L) + )) + )); + + final Position position1 = Position.fromMap(mkMap( + mkEntry("topic", mkMap(mkEntry(0, 7L))), // update offset + mkEntry("topic1", mkMap(mkEntry(8, 1L))), // add partition + mkEntry("topic2", mkMap(mkEntry(9, 5L))) // add topic + )); + + final Position merged = position.merge(position1); + + assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1", "topic2"))); + assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 7L)))); + assertThat(merged.getBound("topic1"), equalTo(mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L), + mkEntry(8, 1L) + ))); + assertThat(merged.getBound("topic2"), equalTo(mkMap(mkEntry(9, 5L)))); + } + + @Test + public void shouldCopy() { + final Position position = Position.fromMap(mkMap( + mkEntry("topic", mkMap(mkEntry(0, 5L))), + mkEntry("topic1", mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L) + )) + )); + + final Position copy = position.copy(); + + // mutate original + position.withComponent("topic", 0, 6L); + position.withComponent("topic1", 8, 1L); + position.withComponent("topic2", 2, 4L); + + // copy has not changed + assertThat(copy.getTopics(), equalTo(mkSet("topic", "topic1"))); + assertThat(copy.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L)))); + assertThat(copy.getBound("topic1"), equalTo(mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L) + ))); + + // original has changed + assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1", "topic2"))); + assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 6L)))); + assertThat(position.getBound("topic1"), equalTo(mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L), + mkEntry(8, 1L) + ))); + assertThat(position.getBound("topic2"), equalTo(mkMap(mkEntry(2, 4L)))); + } + + @Test + public void shouldMergeNull() { + final Position position = Position.fromMap(mkMap( + mkEntry("topic", mkMap(mkEntry(0, 5L))), + mkEntry("topic1", mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L) + )) + )); + + final Position merged = position.merge(null); + + assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1"))); + assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L)))); + assertThat(merged.getBound("topic1"), equalTo(mkMap( + mkEntry(0, 5L), + mkEntry(7, 0L) + ))); + } + + @Test + public void shouldMatchOnEqual() { + final Position position1 = Position.emptyPosition(); + final Position position2 = Position.emptyPosition(); + position1.withComponent("topic1", 0, 1); + position2.withComponent("topic1", 0, 1); + + position1.withComponent("topic1", 1, 2); + position2.withComponent("topic1", 1, 2); + + position1.withComponent("topic1", 2, 1); + position2.withComponent("topic1", 2, 1); + + position1.withComponent("topic2", 0, 0); + position2.withComponent("topic2", 0, 0); + + assertEquals(position1, position2); + } + + @Test + public void shouldNotMatchOnUnEqual() { + final Position position1 = Position.emptyPosition(); + final Position position2 = Position.emptyPosition(); + position1.withComponent("topic1", 0, 1); + position2.withComponent("topic1", 0, 1); + + position1.withComponent("topic1", 1, 2); + + position1.withComponent("topic1", 2, 1); + position2.withComponent("topic1", 2, 1); + + position1.withComponent("topic2", 0, 0); + position2.withComponent("topic2", 0, 0); + + assertNotEquals(position1, position2); + } + + @Test + public void shouldNotMatchNull() { + final Position position = Position.emptyPosition(); + assertNotEquals(position, null); + } + + @Test + public void shouldMatchSelf() { + final Position position = Position.emptyPosition(); + assertEquals(position, position); + } + + @Test + public void shouldNotHash() { + final Position position = Position.emptyPosition(); + assertThrows(UnsupportedOperationException.class, position::hashCode); + + // going overboard... + final HashSet<Position> set = new HashSet<>(); + assertThrows(UnsupportedOperationException.class, () -> set.add(position)); + + final HashMap<Position, Integer> map = new HashMap<>(); + assertThrows(UnsupportedOperationException.class, () -> map.put(position, 5)); + } +}
