This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch iqv2-framework
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit da055cc712646d32f02d5bf3ea0305a649cf4151
Author: John Roesler <[email protected]>
AuthorDate: Tue Nov 23 16:49:52 2021 -0600

    formalize Position
---
 .../org/apache/kafka/streams/query/Position.java   | 120 +++---------
 .../apache/kafka/streams/query/PositionTest.java   | 215 +++++++++++++++++++++
 2 files changed, 243 insertions(+), 92 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java 
b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
index 5b0e981..4c613df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/Position.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.query;
 
 
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -26,38 +28,45 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
+@Evolving
 public class Position {
+    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> 
position;
 
-    private final Map<String, Map<Integer, Long>> position;
-
-    private Position(final Map<String, Map<Integer, Long>> position) {
+    private Position(final ConcurrentHashMap<String, 
ConcurrentHashMap<Integer, Long>> position) {
         this.position = position;
     }
 
     public static Position emptyPosition() {
-        return new Position(new HashMap<>());
+        return new Position(new ConcurrentHashMap<>());
     }
 
-    public static Position fromMap(final Map<String, Map<Integer, Long>> map) {
+    public static Position fromMap(final Map<String, ? extends Map<Integer, 
Long>> map) {
         return new Position(deepCopy(map));
     }
 
     public Position withComponent(final String topic, final int partition, 
final long offset) {
-        final Map<String, Map<Integer, Long>> updated = deepCopy(position);
-        updated.computeIfAbsent(topic, k -> new HashMap<>()).put(partition, 
offset);
-        return new Position(updated);
+        position
+            .computeIfAbsent(topic, k -> new ConcurrentHashMap<>())
+            .put(partition, offset);
+        return this;
+    }
+
+    public Position copy() {
+        return new Position(deepCopy(position));
     }
 
     public Position merge(final Position other) {
         if (other == null) {
             return this;
         } else {
-            final Map<String, Map<Integer, Long>> copy = deepCopy(position);
-            for (final Entry<String, Map<Integer, Long>> entry : 
other.position.entrySet()) {
+            final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> 
copy =
+                deepCopy(position);
+            for (final Entry<String, ConcurrentHashMap<Integer, Long>> entry : 
other.position.entrySet()) {
                 final String topic = entry.getKey();
                 final Map<Integer, Long> partitionMap =
-                    copy.computeIfAbsent(topic, k -> new HashMap<>());
+                    copy.computeIfAbsent(topic, k -> new 
ConcurrentHashMap<>());
                 for (final Entry<Integer, Long> partitionOffset : 
entry.getValue().entrySet()) {
                     final Integer partition = partitionOffset.getKey();
                     final Long offset = partitionOffset.getValue();
@@ -79,88 +88,15 @@ public class Position {
         return Collections.unmodifiableMap(position.get(topic));
     }
 
-    public ByteBuffer serialize() {
-        final byte version = (byte) 0;
-
-        int arraySize = Byte.SIZE; // version
-
-        final int nTopics = position.size();
-        arraySize += Integer.SIZE;
-
-        final ArrayList<Entry<String, Map<Integer, Long>>> entries =
-            new ArrayList<>(position.entrySet());
-        final byte[][] topics = new byte[entries.size()][];
-
-        for (int i = 0; i < nTopics; i++) {
-            final Entry<String, Map<Integer, Long>> entry = entries.get(i);
-            final byte[] topicBytes = 
entry.getKey().getBytes(StandardCharsets.UTF_8);
-            topics[i] = topicBytes;
-            arraySize += Integer.SIZE; // topic name length
-            arraySize += topicBytes.length; // topic name itself
-
-            final Map<Integer, Long> partitionOffsets = entry.getValue();
-            arraySize += Integer.SIZE; // Number of PartitionOffset pairs
-            arraySize += (Integer.SIZE + Long.SIZE)
-                * partitionOffsets.size(); // partitionOffsets themselves
-        }
-
-        final ByteBuffer buffer = ByteBuffer.allocate(arraySize);
-        buffer.put(version);
-
-        buffer.putInt(nTopics);
-        for (int i = 0; i < nTopics; i++) {
-            buffer.putInt(topics[i].length);
-            buffer.put(topics[i]);
-
-            final Entry<String, Map<Integer, Long>> entry = entries.get(i);
-            final Map<Integer, Long> partitionOffsets = entry.getValue();
-            buffer.putInt(partitionOffsets.size());
-            for (final Entry<Integer, Long> partitionOffset : 
partitionOffsets.entrySet()) {
-                buffer.putInt(partitionOffset.getKey());
-                buffer.putLong(partitionOffset.getValue());
-            }
-        }
-
-        buffer.flip();
-        return buffer;
-    }
-
-    public static Position deserialize(final ByteBuffer buffer) {
-        final byte version = buffer.get();
-
-        switch (version) {
-            case (byte) 0:
-                final int nTopics = buffer.getInt();
-                final Map<String, Map<Integer, Long>> position = new 
HashMap<>(nTopics);
-                for (int i = 0; i < nTopics; i++) {
-                    final int topicNameLength = buffer.getInt();
-                    final byte[] topicNameBytes = new byte[topicNameLength];
-                    buffer.get(topicNameBytes);
-                    final String topic = new String(topicNameBytes, 
StandardCharsets.UTF_8);
-
-                    final int numPairs = buffer.getInt();
-                    final Map<Integer, Long> partitionOffsets = new 
HashMap<>(numPairs);
-                    for (int j = 0; j < numPairs; j++) {
-                        partitionOffsets.put(buffer.getInt(), 
buffer.getLong());
-                    }
-                    position.put(topic, partitionOffsets);
-                }
-                return Position.fromMap(position);
-            default:
-                throw new IllegalArgumentException(
-                    "Unknown version " + version + " when deserializing 
Position"
-                );
-        }
-    }
-
-    private static Map<String, Map<Integer, Long>> deepCopy(
-        final Map<String, Map<Integer, Long>> map) {
+    private static ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> 
deepCopy(
+        final Map<String, ? extends Map<Integer, Long>> map) {
         if (map == null) {
-            return new HashMap<>();
+            return new ConcurrentHashMap<>();
         } else {
-            final Map<String, Map<Integer, Long>> copy = new 
HashMap<>(map.size());
-            for (final Entry<String, Map<Integer, Long>> entry : 
map.entrySet()) {
-                copy.put(entry.getKey(), new HashMap<>(entry.getValue()));
+            final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> 
copy =
+                new ConcurrentHashMap<>(map.size());
+            for (final Entry<String, ? extends Map<Integer, Long>> entry : 
map.entrySet()) {
+                copy.put(entry.getKey(), new 
ConcurrentHashMap<>(entry.getValue()));
             }
             return copy;
         }
@@ -187,6 +123,6 @@ public class Position {
 
     @Override
     public int hashCode() {
-        return Objects.hash(position);
+        throw new UnsupportedOperationException("This mutable object is not 
suitable as a hash key");
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java 
b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
new file mode 100644
index 0000000..0a0cf5c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+
+public class PositionTest {
+
+    @Test
+    public void shouldCreateFromMap() {
+        final Map<String, Map<Integer, Long>> map = mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        );
+
+        final Position position = Position.fromMap(map);
+        assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+
+        // Should be a copy of the constructor map
+
+        map.get("topic1").put(99, 99L);
+
+        // so the position is still the original one
+        assertThat(position.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+    }
+
+    @Test
+    public void shouldCreateFromNullMap() {
+        final Position position = Position.fromMap(null);
+        assertThat(position.getTopics(), equalTo(Collections.emptySet()));
+    }
+
+    @Test
+    public void shouldMerge() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position position1 = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 7L))), // update offset
+            mkEntry("topic1", mkMap(mkEntry(8, 1L))), // add partition
+            mkEntry("topic2", mkMap(mkEntry(9, 5L))) // add topic
+        ));
+
+        final Position merged = position.merge(position1);
+
+        assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1", 
"topic2")));
+        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 7L))));
+        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L),
+            mkEntry(8, 1L)
+        )));
+        assertThat(merged.getBound("topic2"), equalTo(mkMap(mkEntry(9, 5L))));
+    }
+
+    @Test
+    public void shouldCopy() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position copy = position.copy();
+
+        // mutate original
+        position.withComponent("topic", 0, 6L);
+        position.withComponent("topic1", 8, 1L);
+        position.withComponent("topic2", 2, 4L);
+
+        // copy has not changed
+        assertThat(copy.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(copy.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(copy.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+
+        // original has changed
+        assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1", 
"topic2")));
+        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 6L))));
+        assertThat(position.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L),
+            mkEntry(8, 1L)
+        )));
+        assertThat(position.getBound("topic2"), equalTo(mkMap(mkEntry(2, 
4L))));
+    }
+
+    @Test
+    public void shouldMergeNull() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position merged = position.merge(null);
+
+        assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+    }
+
+    @Test
+    public void shouldMatchOnEqual() {
+        final Position position1 = Position.emptyPosition();
+        final Position position2 = Position.emptyPosition();
+        position1.withComponent("topic1", 0, 1);
+        position2.withComponent("topic1", 0, 1);
+
+        position1.withComponent("topic1", 1, 2);
+        position2.withComponent("topic1", 1, 2);
+
+        position1.withComponent("topic1", 2, 1);
+        position2.withComponent("topic1", 2, 1);
+
+        position1.withComponent("topic2", 0, 0);
+        position2.withComponent("topic2", 0, 0);
+
+        assertEquals(position1, position2);
+    }
+
+    @Test
+    public void shouldNotMatchOnUnEqual() {
+        final Position position1 = Position.emptyPosition();
+        final Position position2 = Position.emptyPosition();
+        position1.withComponent("topic1", 0, 1);
+        position2.withComponent("topic1", 0, 1);
+
+        position1.withComponent("topic1", 1, 2);
+
+        position1.withComponent("topic1", 2, 1);
+        position2.withComponent("topic1", 2, 1);
+
+        position1.withComponent("topic2", 0, 0);
+        position2.withComponent("topic2", 0, 0);
+
+        assertNotEquals(position1, position2);
+    }
+
+    @Test
+    public void shouldNotMatchNull() {
+        final Position position = Position.emptyPosition();
+        assertNotEquals(position, null);
+    }
+
+    @Test
+    public void shouldMatchSelf() {
+        final Position position = Position.emptyPosition();
+        assertEquals(position, position);
+    }
+
+    @Test
+    public void shouldNotHash() {
+        final Position position = Position.emptyPosition();
+        assertThrows(UnsupportedOperationException.class, position::hashCode);
+
+        // going overboard...
+        final HashSet<Position> set = new HashSet<>();
+        assertThrows(UnsupportedOperationException.class, () -> 
set.add(position));
+
+        final HashMap<Position, Integer> map = new HashMap<>();
+        assertThrows(UnsupportedOperationException.class, () -> 
map.put(position, 5));
+    }
+}

Reply via email to