bkonold commented on a change in pull request #1404:
URL: https://github.com/apache/samza/pull/1404#discussion_r461230217



##########
File path: 
samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
##########
@@ -19,66 +19,501 @@
 
 package org.apache.samza.storage.kv.inmemory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
-import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.Counter;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueSnapshot;
 import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
 
 public class TestInMemoryKeyValueStore {
+  private static final String DEFAULT_KEY_PREFIX = "key_prefix";
+  private static final String OTHER_KEY_PREFIX = "other_key_prefix";
+  /**
+   * Keep the lengths of the values longer so that metrics validations for key 
and value sizes don't collide.
+   */
+  private static final String DEFAULT_VALUE_PREFIX = 
"value_prefix_value_prefix";
+  private static final String OTHER_VALUE_PREFIX = 
"other_value_prefix_value_prefix";
+
+  @Mock
+  private KeyValueStoreMetrics keyValueStoreMetrics;
+  @Mock
+  private Counter getsCounter;
+  @Mock
+  private Counter bytesReadCounter;
+  @Mock
+  private Counter putsCounter;
+  @Mock
+  private Counter bytesWrittenCounter;
+  @Mock
+  private Counter deletesCounter;
+
+  private InMemoryKeyValueStore inMemoryKeyValueStore;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when(this.keyValueStoreMetrics.gets()).thenReturn(this.getsCounter);
+    
when(this.keyValueStoreMetrics.bytesRead()).thenReturn(this.bytesReadCounter);
+    when(this.keyValueStoreMetrics.puts()).thenReturn(this.putsCounter);
+    
when(this.keyValueStoreMetrics.bytesWritten()).thenReturn(this.bytesWrittenCounter);
+    when(this.keyValueStoreMetrics.deletes()).thenReturn(this.deletesCounter);
+    this.inMemoryKeyValueStore = new 
InMemoryKeyValueStore(this.keyValueStoreMetrics);
+  }
+
+  @Test
+  public void testGet() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), 
value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), 
this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.getsCounter, times(2)).inc();
+    verify(this.bytesReadCounter).inc(value(0).length);
+    verify(this.bytesReadCounter).inc(value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testGetEmpty() {
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testGetAfterDelete() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.delete(key(0));
+
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testPut() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), 
value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), 
this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(OTHER_KEY_PREFIX, 1).length + 
value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutExistingEntry() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(0), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), 
this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(0).length + 
value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutEmpty() {
+    byte[] emptyValue = new byte[0];
+    this.inMemoryKeyValueStore.put(key(0), emptyValue);
+
+    assertEquals(0, this.inMemoryKeyValueStore.get(key(0)).length);
+    verify(this.putsCounter).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length);
+  }

Review comment:
       is there an edge case this is intended to test that `testPut` doesn't 
cover?

##########
File path: 
samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
##########
@@ -19,66 +19,501 @@
 
 package org.apache.samza.storage.kv.inmemory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
-import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.Counter;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueSnapshot;
 import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
 
 public class TestInMemoryKeyValueStore {
+  private static final String DEFAULT_KEY_PREFIX = "key_prefix";
+  private static final String OTHER_KEY_PREFIX = "other_key_prefix";
+  /**
+   * Keep the lengths of the values longer so that metrics validations for key 
and value sizes don't collide.
+   */
+  private static final String DEFAULT_VALUE_PREFIX = 
"value_prefix_value_prefix";
+  private static final String OTHER_VALUE_PREFIX = 
"other_value_prefix_value_prefix";
+
+  @Mock
+  private KeyValueStoreMetrics keyValueStoreMetrics;
+  @Mock
+  private Counter getsCounter;
+  @Mock
+  private Counter bytesReadCounter;
+  @Mock
+  private Counter putsCounter;
+  @Mock
+  private Counter bytesWrittenCounter;
+  @Mock
+  private Counter deletesCounter;
+
+  private InMemoryKeyValueStore inMemoryKeyValueStore;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when(this.keyValueStoreMetrics.gets()).thenReturn(this.getsCounter);
+    
when(this.keyValueStoreMetrics.bytesRead()).thenReturn(this.bytesReadCounter);
+    when(this.keyValueStoreMetrics.puts()).thenReturn(this.putsCounter);
+    
when(this.keyValueStoreMetrics.bytesWritten()).thenReturn(this.bytesWrittenCounter);
+    when(this.keyValueStoreMetrics.deletes()).thenReturn(this.deletesCounter);
+    this.inMemoryKeyValueStore = new 
InMemoryKeyValueStore(this.keyValueStoreMetrics);
+  }
+
+  @Test
+  public void testGet() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), 
value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), 
this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.getsCounter, times(2)).inc();
+    verify(this.bytesReadCounter).inc(value(0).length);
+    verify(this.bytesReadCounter).inc(value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testGetEmpty() {
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testGetAfterDelete() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.delete(key(0));
+
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testPut() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), 
value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), 
this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(OTHER_KEY_PREFIX, 1).length + 
value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutExistingEntry() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(0), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), 
this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(0).length + 
value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutEmpty() {
+    byte[] emptyValue = new byte[0];
+    this.inMemoryKeyValueStore.put(key(0), emptyValue);
+
+    assertEquals(0, this.inMemoryKeyValueStore.get(key(0)).length);
+    verify(this.putsCounter).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length);
+  }
+
+  @Test
+  public void testPutNull() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(0), null);
+
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.deletesCounter).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+  }
+
+  @Test
+  public void testPutAll() {
+    List<Entry<byte[], byte[]>> entries = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      entries.add(new Entry<>(key(i), value(i)));
+    }
+    this.inMemoryKeyValueStore.putAll(entries);
+
+    for (int i = 0; i < 10; i++) {
+      assertArrayEquals(value(i), this.inMemoryKeyValueStore.get(key(i)));
+    }
+    verify(this.putsCounter, times(10)).inc();
+    verify(this.bytesWrittenCounter, times(10)).inc(key(0).length + 
value(0).length);
+  }
+
+  @Test
+  public void testPutAllUpdate() {
+    // check that an existing value is overridden
+    this.inMemoryKeyValueStore.put(key(0), value(1234));
+    List<Entry<byte[], byte[]>> entries = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      entries.add(new Entry<>(key(i), value(i)));
+    }
+    this.inMemoryKeyValueStore.putAll(entries);
+
+    for (int i = 0; i < 10; i++) {
+      assertArrayEquals(value(i), this.inMemoryKeyValueStore.get(key(i)));
+    }
+    // 1 time for initial value to be overridden, 10 times for "regular" puts
+    verify(this.putsCounter, times(11)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(1234).length);
+    verify(this.bytesWrittenCounter, times(10)).inc(key(0).length + 
value(0).length);

Review comment:
       doesn't this assume that `key(0)` and `value(0)` lengths are the same as 
`key(i)` and `value(i)` from the loop? can we re-write this so it more 
obviously matches the 10 writes that happen in the loop or add a comment 
indicating this assumption? or perhaps define a constant that represents the 
length of an integer key / value... just to make this more readable.
   
   same comment applies to other occurrences of loops in your tests

##########
File path: 
samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.inmemory;
+
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedBytes;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.storage.kv.KeyValueSnapshot;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+
+
+/**
+ * In-memory implementation of a {@link KeyValueStore}.
+ *
+ * This uses a {@link ConcurrentSkipListMap} to store the keys in order.
+ */
+public class InMemoryKeyValueStore implements KeyValueStore<byte[], byte[]> {
+  private final KeyValueStoreMetrics metrics;
+  private final ConcurrentSkipListMap<byte[], byte[]> underlying;
+
+  /**
+   * @param metrics A metrics instance to publish key-value store related 
statistics
+   */
+  public InMemoryKeyValueStore(KeyValueStoreMetrics metrics) {
+    this.metrics = metrics;
+    this.underlying = new 
ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator());
+  }
+
+  @Override
+  public byte[] get(byte[] key) {
+    this.metrics.gets().inc();
+    Preconditions.checkArgument(key != null, "Null argument 'key' not 
allowed");
+    byte[] found = this.underlying.get(key);
+    if (found != null) {
+      metrics.bytesRead().inc(found.length);
+    }
+    return found;
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) {
+    this.metrics.puts().inc();
+    Preconditions.checkArgument(key != null, "Null argument 'key' not 
allowed");
+    if (value == null) {
+      this.metrics.deletes().inc();
+      this.underlying.remove(key);
+    } else {
+      this.metrics.bytesWritten().inc(key.length + value.length);
+      this.underlying.put(key, value);
+    }
+  }
+
+  @Override
+  public void putAll(List<Entry<byte[], byte[]>> entries) {
+    // TreeMap's putAll requires a map, so we'd need to iterate over all the 
entries anyway
+    // to use it, in order to putAll here.  Therefore, just iterate here.
+    for (Entry<byte[], byte[]> next : entries) {
+      put(next.getKey(), next.getValue());
+    }
+  }
+
+  @Override
+  public void delete(byte[] key) {
+    // TODO Bug: This double counts deletes for metrics, because put also 
counts a delete
+    metrics.deletes().inc();
+    put(key, null);
+  }
+
+  @Override
+  public KeyValueIterator<byte[], byte[]> range(byte[] from, byte[] to) {
+    this.metrics.ranges().inc();
+    Preconditions.checkArgument(from != null, "Null argument 'from' not 
allowed");
+    Preconditions.checkArgument(to != null, "Null argument 'to' not allowed");
+    return new InMemoryIterator(this.underlying.subMap(from, 
to).entrySet().iterator(), this.metrics);
+  }
+
+  @Override
+  public KeyValueSnapshot<byte[], byte[]> snapshot(byte[] from, byte[] to) {
+    // TODO: Bug: This does not satisfy the immutability constraint, since the 
entrySet is backed by the underlying map.

Review comment:
       link the jira?

##########
File path: 
samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
##########
@@ -19,66 +19,501 @@
 
 package org.apache.samza.storage.kv.inmemory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
-import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.Counter;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueSnapshot;
 import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
 
 public class TestInMemoryKeyValueStore {
+  private static final String DEFAULT_KEY_PREFIX = "key_prefix";
+  private static final String OTHER_KEY_PREFIX = "other_key_prefix";
+  /**
+   * Keep the lengths of the values longer so that metrics validations for key 
and value sizes don't collide.
+   */
+  private static final String DEFAULT_VALUE_PREFIX = 
"value_prefix_value_prefix";
+  private static final String OTHER_VALUE_PREFIX = 
"other_value_prefix_value_prefix";
+
+  @Mock
+  private KeyValueStoreMetrics keyValueStoreMetrics;
+  @Mock
+  private Counter getsCounter;
+  @Mock
+  private Counter bytesReadCounter;
+  @Mock
+  private Counter putsCounter;
+  @Mock
+  private Counter bytesWrittenCounter;
+  @Mock
+  private Counter deletesCounter;
+
+  private InMemoryKeyValueStore inMemoryKeyValueStore;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when(this.keyValueStoreMetrics.gets()).thenReturn(this.getsCounter);
+    
when(this.keyValueStoreMetrics.bytesRead()).thenReturn(this.bytesReadCounter);
+    when(this.keyValueStoreMetrics.puts()).thenReturn(this.putsCounter);
+    
when(this.keyValueStoreMetrics.bytesWritten()).thenReturn(this.bytesWrittenCounter);
+    when(this.keyValueStoreMetrics.deletes()).thenReturn(this.deletesCounter);
+    this.inMemoryKeyValueStore = new 
InMemoryKeyValueStore(this.keyValueStoreMetrics);
+  }
+
+  @Test
+  public void testGet() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), 
value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), 
this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.getsCounter, times(2)).inc();
+    verify(this.bytesReadCounter).inc(value(0).length);
+    verify(this.bytesReadCounter).inc(value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testGetEmpty() {
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testGetAfterDelete() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.delete(key(0));
+
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testPut() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), 
value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), 
this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(OTHER_KEY_PREFIX, 1).length + 
value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutExistingEntry() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(0), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), 
this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(0).length + 
value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutEmpty() {
+    byte[] emptyValue = new byte[0];
+    this.inMemoryKeyValueStore.put(key(0), emptyValue);
+
+    assertEquals(0, this.inMemoryKeyValueStore.get(key(0)).length);
+    verify(this.putsCounter).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length);
+  }
+
+  @Test
+  public void testPutNull() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(0), null);
+
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.deletesCounter).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+  }
+
+  @Test
+  public void testPutAll() {
+    List<Entry<byte[], byte[]>> entries = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      entries.add(new Entry<>(key(i), value(i)));
+    }
+    this.inMemoryKeyValueStore.putAll(entries);
+
+    for (int i = 0; i < 10; i++) {
+      assertArrayEquals(value(i), this.inMemoryKeyValueStore.get(key(i)));
+    }
+    verify(this.putsCounter, times(10)).inc();
+    verify(this.bytesWrittenCounter, times(10)).inc(key(0).length + 
value(0).length);
+  }
+
+  @Test
+  public void testPutAllUpdate() {
+    // check that an existing value is overridden
+    this.inMemoryKeyValueStore.put(key(0), value(1234));
+    List<Entry<byte[], byte[]>> entries = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      entries.add(new Entry<>(key(i), value(i)));
+    }
+    this.inMemoryKeyValueStore.putAll(entries);
+
+    for (int i = 0; i < 10; i++) {
+      assertArrayEquals(value(i), this.inMemoryKeyValueStore.get(key(i)));
+    }
+    // 1 time for initial value to be overridden, 10 times for "regular" puts
+    verify(this.putsCounter, times(11)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(1234).length);
+    verify(this.bytesWrittenCounter, times(10)).inc(key(0).length + 
value(0).length);
+  }
+
+  @Test
+  public void testPutAllWithNull() {
+    List<Entry<byte[], byte[]>> entries = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      entries.add(new Entry<>(key(i), value(i)));
+    }
+    this.inMemoryKeyValueStore.putAll(entries);
+
+    List<Entry<byte[], byte[]>> deleteEntries = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      deleteEntries.add(new Entry<>(key(i), null));
+    }
+    this.inMemoryKeyValueStore.putAll(deleteEntries);
+
+    for (int i = 0; i < 10; i++) {
+      if (i < 3) {
+        assertNull(this.inMemoryKeyValueStore.get(key(i)));
+      } else {
+        assertArrayEquals(value(i), this.inMemoryKeyValueStore.get(key(i)));
+      }
+    }
+    // 10 times for "regular" puts, 3 times for deletion puts
+    verify(this.putsCounter, times(13)).inc();
+    // 10 "regular" puts all have same size for key/value
+    verify(this.bytesWrittenCounter, times(10)).inc(key(0).length + 
value(0).length);
+    verifyNoMoreInteractions(this.bytesWrittenCounter);
+    verify(this.deletesCounter, times(3)).inc();
+  }
+
+  @Test
+  public void testDelete() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.delete(key(0));
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+
+    /*
+     * There is a bug in which deletes are double counted in metrics. This 
deletesCounter should only be invoked once

Review comment:
       ty for noting the bug; will make it easy to fix when tests fail once the 
bug is fixed 

##########
File path: 
samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.inmemory;
+
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedBytes;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.storage.kv.KeyValueSnapshot;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+
+
+/**
+ * In-memory implementation of a {@link KeyValueStore}.
+ *
+ * This uses a {@link ConcurrentSkipListMap} to store the keys in order.
+ */
+public class InMemoryKeyValueStore implements KeyValueStore<byte[], byte[]> {
+  private final KeyValueStoreMetrics metrics;
+  private final ConcurrentSkipListMap<byte[], byte[]> underlying;
+
+  /**
+   * @param metrics A metrics instance to publish key-value store related 
statistics
+   */
+  public InMemoryKeyValueStore(KeyValueStoreMetrics metrics) {
+    this.metrics = metrics;
+    this.underlying = new 
ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator());
+  }
+
+  @Override
+  public byte[] get(byte[] key) {
+    this.metrics.gets().inc();
+    Preconditions.checkArgument(key != null, "Null argument 'key' not 
allowed");
+    byte[] found = this.underlying.get(key);
+    if (found != null) {
+      metrics.bytesRead().inc(found.length);
+    }
+    return found;
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) {
+    this.metrics.puts().inc();
+    Preconditions.checkArgument(key != null, "Null argument 'key' not 
allowed");
+    if (value == null) {
+      this.metrics.deletes().inc();
+      this.underlying.remove(key);
+    } else {
+      this.metrics.bytesWritten().inc(key.length + value.length);
+      this.underlying.put(key, value);
+    }
+  }
+
+  @Override
+  public void putAll(List<Entry<byte[], byte[]>> entries) {
+    // TreeMap's putAll requires a map, so we'd need to iterate over all the 
entries anyway
+    // to use it, in order to putAll here.  Therefore, just iterate here.
+    for (Entry<byte[], byte[]> next : entries) {
+      put(next.getKey(), next.getValue());
+    }
+  }
+
+  @Override
+  public void delete(byte[] key) {
+    // TODO Bug: This double counts deletes for metrics, because put also 
counts a delete

Review comment:
       link the jira?

##########
File path: 
samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
##########
@@ -19,66 +19,501 @@
 
 package org.apache.samza.storage.kv.inmemory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
-import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.Counter;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueSnapshot;
 import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
 
 public class TestInMemoryKeyValueStore {
+  private static final String DEFAULT_KEY_PREFIX = "key_prefix";
+  private static final String OTHER_KEY_PREFIX = "other_key_prefix";
+  /**
+   * Keep the lengths of the values longer so that metrics validations for key 
and value sizes don't collide.
+   */
+  private static final String DEFAULT_VALUE_PREFIX = 
"value_prefix_value_prefix";
+  private static final String OTHER_VALUE_PREFIX = 
"other_value_prefix_value_prefix";
+
+  @Mock
+  private KeyValueStoreMetrics keyValueStoreMetrics;
+  @Mock
+  private Counter getsCounter;
+  @Mock
+  private Counter bytesReadCounter;
+  @Mock
+  private Counter putsCounter;
+  @Mock
+  private Counter bytesWrittenCounter;
+  @Mock
+  private Counter deletesCounter;
+
+  private InMemoryKeyValueStore inMemoryKeyValueStore;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when(this.keyValueStoreMetrics.gets()).thenReturn(this.getsCounter);
+    
when(this.keyValueStoreMetrics.bytesRead()).thenReturn(this.bytesReadCounter);
+    when(this.keyValueStoreMetrics.puts()).thenReturn(this.putsCounter);
+    
when(this.keyValueStoreMetrics.bytesWritten()).thenReturn(this.bytesWrittenCounter);
+    when(this.keyValueStoreMetrics.deletes()).thenReturn(this.deletesCounter);
+    this.inMemoryKeyValueStore = new 
InMemoryKeyValueStore(this.keyValueStoreMetrics);
+  }
+
+  @Test
+  public void testGet() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), 
value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), 
this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.getsCounter, times(2)).inc();
+    verify(this.bytesReadCounter).inc(value(0).length);
+    verify(this.bytesReadCounter).inc(value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testGetEmpty() {
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testGetAfterDelete() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.delete(key(0));
+
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testPut() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), 
value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), 
this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(OTHER_KEY_PREFIX, 1).length + 
value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutExistingEntry() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(0), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), 
this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(0).length + 
value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutEmpty() {
+    byte[] emptyValue = new byte[0];
+    this.inMemoryKeyValueStore.put(key(0), emptyValue);
+
+    assertEquals(0, this.inMemoryKeyValueStore.get(key(0)).length);
+    verify(this.putsCounter).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length);
+  }

Review comment:
       Sorry should have been more clear. What code path is this testing that 
testPut isn't? Does the implementation behave differently when the length is 0?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to