Repository: samza
Updated Branches:
  refs/heads/master e204d298e -> 15e0ababb


SAMZA-1696: Delete TestKeyValueStores flaky test

`testParallelReadWriteSameKey` is a flaky test that was failing due to a race 
condition between the main test thread and the thread manipulating the 
key-value store under test. Specifically, the main test thread could assert the 
store has received a value before another thread gets to set it. The hard-coded 
wait durations did not guarantee the main test thread would wait for all other 
threads to complete, causing it to assert prematurely.

This change deletes this test since the underlying `RocksDB` store wrapped 
within `RocksDbKeyValueStore` is thread-safe by design, which renders this test 
redundant and unnecessary.

Author: Ahmed Abdul Hamid <ahabd...@ahabdulh-ld1.linkedin.biz>

Reviewers: Shanthoosh Venkataraman <svenk...@linkedin.com>

Closes #500 from ahmedahamid/dev/fix-1696


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/15e0abab
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/15e0abab
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/15e0abab

Branch: refs/heads/master
Commit: 15e0ababb5b5bd08fdbbbeb0b0d971f93c1900de
Parents: e204d29
Author: Ahmed Abdul Hamid <ahabd...@ahabdulh-ld1.linkedin.biz>
Authored: Fri May 11 10:55:02 2018 -0700
Committer: Prateek Maheshwari <pmahe...@linkedin.com>
Committed: Fri May 11 10:55:02 2018 -0700

----------------------------------------------------------------------
 .../samza/storage/kv/TestKeyValueStores.scala   | 240 +++++++++----------
 1 file changed, 120 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/15e0abab/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
 
b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index babd15c..37fde3a 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -20,9 +20,8 @@
 package org.apache.samza.storage.kv
 
 import java.io.File
-import java.util.Arrays
-import java.util.Random
 import java.util.concurrent.CountDownLatch
+import java.util.{Arrays, Random}
 
 import org.apache.samza.config.MapConfig
 import org.apache.samza.serializers.Serde
@@ -31,21 +30,21 @@ import org.junit.Assert._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
-import org.junit.After
-import org.junit.Before
-import org.junit.Test
+import org.junit.{After, Before, Test}
 import org.scalatest.Assertions.intercept
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 /**
- * Test suite to check different key value store operations
- * @param typeOfStore Defines type of key-value store (Eg: "rocksdb" / 
"inmemory")
- * @param storeConfig Defines whether we're using caching / serde / both / or 
none in front of the store
- */
+  * Test suite to check different key value store operations
+  *
+  * @param typeOfStore Defines type of key-value store (Eg: "rocksdb" / 
"inmemory")
+  * @param storeConfig Defines whether we're using caching / serde / both / or 
none in front of the store
+  */
 @RunWith(value = classOf[Parameterized])
 class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
+
   import TestKeyValueStores._
 
   val letters = "abcdefghijklmnopqrstuvwxyz".map(_.toString)
@@ -56,11 +55,11 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: 
String) {
 
   @Before
   def setup() {
-    val kvStore : KeyValueStore[Array[Byte], Array[Byte]] = typeOfStore match {
+    val kvStore: KeyValueStore[Array[Byte], Array[Byte]] = typeOfStore match {
       case "inmemory" =>
         new InMemoryKeyValueStore
       case "rocksdb" =>
-        new RocksDbKeyValueStore (
+        new RocksDbKeyValueStore(
           dir,
           new org.rocksdb.Options()
             .setCreateIfMissing(true)
@@ -74,6 +73,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: 
String) {
 
     val passThroughSerde = new Serde[Array[Byte]] {
       def toBytes(obj: Array[Byte]) = obj
+
       def fromBytes(bytes: Array[Byte]) = bytes
     }
 
@@ -158,27 +158,51 @@ class TestKeyValueStores(typeOfStore: String, 
storeConfig: String) {
     if (serde) {
       val a = b("a")
 
-      intercept[NullPointerException] { store.get(null) }
-      intercept[NullPointerException] { store.getAll(null) }
-      intercept[NullPointerException] { store.getAll(List(a, null).asJava) }
-      intercept[NullPointerException] { store.delete(null) }
-      intercept[NullPointerException] { store.deleteAll(null) }
-      intercept[NullPointerException] { store.deleteAll(List(a, null).asJava) }
-      intercept[NullPointerException] { store.put(null, a) }
-      intercept[NullPointerException] { store.put(a, null) }
-      intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new 
Entry[Array[Byte], Array[Byte]](a, null)).asJava) }
-      intercept[NullPointerException] { store.putAll(List(new 
Entry[Array[Byte], Array[Byte]](null, a)).asJava) }
-      intercept[NullPointerException] { store.range(a, null) }
-      intercept[NullPointerException] { store.range(null, a) }
+      intercept[NullPointerException] {
+        store.get(null)
+      }
+      intercept[NullPointerException] {
+        store.getAll(null)
+      }
+      intercept[NullPointerException] {
+        store.getAll(List(a, null).asJava)
+      }
+      intercept[NullPointerException] {
+        store.delete(null)
+      }
+      intercept[NullPointerException] {
+        store.deleteAll(null)
+      }
+      intercept[NullPointerException] {
+        store.deleteAll(List(a, null).asJava)
+      }
+      intercept[NullPointerException] {
+        store.put(null, a)
+      }
+      intercept[NullPointerException] {
+        store.put(a, null)
+      }
+      intercept[NullPointerException] {
+        store.putAll(List(new Entry(a, a), new Entry[Array[Byte], 
Array[Byte]](a, null)).asJava)
+      }
+      intercept[NullPointerException] {
+        store.putAll(List(new Entry[Array[Byte], Array[Byte]](null, a)).asJava)
+      }
+      intercept[NullPointerException] {
+        store.range(a, null)
+      }
+      intercept[NullPointerException] {
+        store.range(null, a)
+      }
     }
   }
 
   @Test
   def testPutAll() {
-    // Use CacheSize - 1 so we fully fill the cache, but don't write any data 
-    // out. Our check (below) uses == for cached entries, and using 
-    // numEntires >= CacheSize would result in the LRU cache dropping some 
-    // entries. The result would be that we get the correct byte array back 
+    // Use CacheSize - 1 so we fully fill the cache, but don't write any data
+    // out. Our check (below) uses == for cached entries, and using
+    // numEntires >= CacheSize would result in the LRU cache dropping some
+    // entries. The result would be that we get the correct byte array back
     // from the cache's underlying store (rocksdb), but that == would fail.
     val numEntries = CacheSize - 1
     val entries = (0 until numEntries).map(i => new Entry(b("k" + i), b("v" + 
i)))
@@ -270,42 +294,42 @@ class TestKeyValueStores(typeOfStore: String, 
storeConfig: String) {
   }
 
   /**
-   * This test specifically targets an issue in Scala 2.8.1's DoubleLinkedList
-   * implementation. The issue is that it doesn't work. More specifically,
-   * creating a DoubleLinkedList from an existing list does not update the
-   * "prev" field of the existing list's head to point to the new head. As a
-   * result, in Scala 2.8.1, every DoubleLinkedList node's prev field is null.
-   * Samza gets around this by manually updating the field itself. See SAMZA-80
-   * for details.
-   *
-   * This issue is exposed in Samza's KV cache implementation, which uses
-   * DoubleLinkedList, so all comments in this method are discussing the cached
-   * implementation, but the test is still useful as a sanity check for
-   * non-cached stores.
-   */
+    * This test specifically targets an issue in Scala 2.8.1's DoubleLinkedList
+    * implementation. The issue is that it doesn't work. More specifically,
+    * creating a DoubleLinkedList from an existing list does not update the
+    * "prev" field of the existing list's head to point to the new head. As a
+    * result, in Scala 2.8.1, every DoubleLinkedList node's prev field is null.
+    * Samza gets around this by manually updating the field itself. See 
SAMZA-80
+    * for details.
+    *
+    * This issue is exposed in Samza's KV cache implementation, which uses
+    * DoubleLinkedList, so all comments in this method are discussing the 
cached
+    * implementation, but the test is still useful as a sanity check for
+    * non-cached stores.
+    */
   @Test
   def testBrokenScalaDoubleLinkedList() {
     val something = b("")
     val keys = letters
-            .map(b(_))
-            .toArray
+      .map(b(_))
+      .toArray
 
     // Load the cache to capacity.
     letters
-            .slice(0, TestKeyValueStores.CacheSize)
-            .map(b(_))
-            .foreach(store.put(_, something))
+      .slice(0, TestKeyValueStores.CacheSize)
+      .map(b(_))
+      .foreach(store.put(_, something))
 
     // Now keep everything in the cache, but with an empty dirty list.
     store.flush
 
     // Dirty list is now empty, and every CacheEntry has dirty=null.
 
-    // Corrupt the dirty list by creating two dirty lists that toggle back and 
+    // Corrupt the dirty list by creating two dirty lists that toggle back and
     // forth depending on whether the last dirty write was to 1 or 0. The trick
     // here is that every element in the cache is treated as the "head" of the
     // DoulbeLinkedList (prev==null), even though it's not necessarily. Thus,
-    // You can end up with multiple nodes each having their own version of the 
+    // You can end up with multiple nodes each having their own version of the
     // dirty list with different elements in them.
     store.put(keys(1), something)
     store.put(keys(0), something)
@@ -313,37 +337,37 @@ class TestKeyValueStores(typeOfStore: String, 
storeConfig: String) {
     store.flush
     // The dirty list is now empty, but 0's dirty field actually has 0 and 1.
     store.put(keys(0), something)
-    // The dirty list now has 0 and 1, but 1's dirty field is null in the 
+    // The dirty list now has 0 and 1, but 1's dirty field is null in the
     // cache because it was just flushed.
 
-    // Get rid of 1 from the cache by reading every other element, and then 
+    // Get rid of 1 from the cache by reading every other element, and then
     // putting one new element.
     letters
-            .slice(2, TestKeyValueStores.CacheSize)
-            .map(b(_))
-            .foreach(store.get(_))
+      .slice(2, TestKeyValueStores.CacheSize)
+      .map(b(_))
+      .foreach(store.get(_))
     store.put(keys(10), something)
 
-    // Now try and trigger an NPE since the dirty list has an element (1) 
+    // Now try and trigger an NPE since the dirty list has an element (1)
     // that's no longer in the cache.
     store.flush
   }
 
   /**
-   * A little test that tries to simulate a few common patterns:
-   * read-modify-write, and do-some-stuff-then-delete (windowing).
-   */
+    * A little test that tries to simulate a few common patterns:
+    * read-modify-write, and do-some-stuff-then-delete (windowing).
+    */
   @Test
   def testRandomReadWriteRemove() {
     // Make test deterministic by seeding the random number generator.
     val rand = new Random(12345)
     val keys = letters
-            .map(b(_))
-            .toArray
+      .map(b(_))
+      .toArray
 
     // Map from letter to key byte array used for letter, and expected value.
-    // We have to go through some acrobatics here since Java's byte array uses 
-    // object identity for .equals. Two byte arrays can have identical byte 
+    // We have to go through some acrobatics here since Java's byte array uses
+    // object identity for .equals. Two byte arrays can have identical byte
     // elements, but not be equal.
     var expected = Map[String, (Array[Byte], String)]()
 
@@ -385,41 +409,6 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: 
String) {
   }
 
   @Test
-  def testParallelReadWriteSameKey(): Unit = {
-    // Make test deterministic by seeding the random number generator.
-    val key = b("key")
-    val val1 = "val1"
-    val val2 = "val2"
-
-    val runner1 = new Thread(new Runnable {
-      override def run(): Unit = {
-        store.put(key, b(val1))
-      }
-    })
-
-    val runner2 = new Thread(new Runnable {
-      override def run(): Unit = {
-        while(!val1.equals({store.get(key) match {
-          case null => ""
-          case _ => { new String(store.get(key), "UTF-8") }
-        }})) {}
-        store.put(key, b(val2))
-      }
-    })
-
-    runner2.start()
-    runner1.start()
-
-    runner2.join(1000)
-    runner1.join(1000)
-
-    assertEquals("val2", new String(store.get(key), "UTF-8"))
-
-    store.delete(key)
-    store.flush()
-  }
-
-  @Test
   def testParallelReadWriteDiffKeys(): Unit = {
     // Make test deterministic by seeding the random number generator.
     val key1 = b("key1")
@@ -435,10 +424,14 @@ class TestKeyValueStores(typeOfStore: String, 
storeConfig: String) {
 
     val runner2 = new Thread(new Runnable {
       override def run(): Unit = {
-        while(!val1.equals({store.get(key1) match {
-          case null => ""
-          case _ => { new String(store.get(key1), "UTF-8") }
-        }})) {}
+        while (!val1.equals({
+          store.get(key1) match {
+            case null => ""
+            case _ => {
+              new String(store.get(key1), "UTF-8")
+            }
+          }
+        })) {}
         store.delete(key1)
       }
     })
@@ -451,10 +444,14 @@ class TestKeyValueStores(typeOfStore: String, 
storeConfig: String) {
 
     val runner4 = new Thread(new Runnable {
       override def run(): Unit = {
-        while(!val2.equals({store.get(key2) match {
-          case null => ""
-          case _ => { new String(store.get(key2), "UTF-8") }
-        }})) {}
+        while (!val2.equals({
+          store.get(key2) match {
+            case null => ""
+            case _ => {
+              new String(store.get(key2), "UTF-8")
+            }
+          }
+        })) {}
         store.delete(key2)
       }
     })
@@ -521,7 +518,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: 
String) {
 
     val runner3 = new Thread(new Runnable {
       override def run(): Unit = {
-        val iter = store.all()  //snapshot
+        val iter = store.all() //snapshot
         runner1StartLatch.countDown()
         try {
           while (iter.hasNext) {
@@ -554,7 +551,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: 
String) {
     runner1.join()
     runner3.join()
 
-    if(throwable != null) throw throwable
+    if (throwable != null) throw throwable
 
     store.flush()
   }
@@ -567,18 +564,20 @@ class TestKeyValueStores(typeOfStore: String, 
storeConfig: String) {
       assertEquals(v, s(entry.getValue))
     }
     assertFalse(iter.hasNext)
-    intercept[NoSuchElementException] { iter.next() }
+    intercept[NoSuchElementException] {
+      iter.next()
+    }
   }
 
   /**
-   * Convert string to byte buffer
-   */
+    * Convert string to byte buffer
+    */
   def b(s: String) =
     s.getBytes
 
   /**
-   * Convert byte buffer to string
-   */
+    * Convert byte buffer to string
+    */
   def s(b: Array[Byte]) =
     new String(b)
 }
@@ -586,17 +585,18 @@ class TestKeyValueStores(typeOfStore: String, 
storeConfig: String) {
 object TestKeyValueStores {
   val CacheSize = 1024
   val BatchSize = 1024
+
   @Parameters
   def parameters: java.util.Collection[Array[String]] = Arrays.asList(
-      //Inmemory
-      Array("inmemory", "cache"),
-      Array("inmemory", "serde"),
-      Array("inmemory", "cache-and-serde"),
-      Array("inmemory", "none"),
-      //RocksDB
-      Array("rocksdb","cache"),
-      Array("rocksdb","serde"),
-      Array("rocksdb","cache-and-serde"),
-      Array("rocksdb","none")
+    //Inmemory
+    Array("inmemory", "cache"),
+    Array("inmemory", "serde"),
+    Array("inmemory", "cache-and-serde"),
+    Array("inmemory", "none"),
+    //RocksDB
+    Array("rocksdb", "cache"),
+    Array("rocksdb", "serde"),
+    Array("rocksdb", "cache-and-serde"),
+    Array("rocksdb", "none")
   )
 }

Reply via email to