SAMZA-505; warn when using an array as a key in cached store

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ff40b12e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ff40b12e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ff40b12e

Branch: refs/heads/samza-sql
Commit: ff40b12e36c0cc3fd8c65b75358be56df4967727
Parents: fcb5cea
Author: Chris Riccomini <[email protected]>
Authored: Wed Mar 11 09:04:38 2015 -0700
Committer: Chris Riccomini <[email protected]>
Committed: Wed Mar 11 09:04:38 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |  1 +
 .../apache/samza/container/SamzaContainer.scala |  1 +
 .../samza/serializers/ByteBufferSerde.scala     | 49 ++++++++++++++++++++
 .../apache/samza/serializers/ByteSerde.scala    |  3 +-
 .../samza/serializers/TestByteBufferSerde.scala | 41 ++++++++++++++++
 .../apache/samza/storage/kv/CachedStore.scala   | 15 ++++++
 .../samza/storage/kv/TestCachedStore.scala      | 35 ++++++++++++++
 7 files changed, 144 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ff40b12e/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 0a268ac..08583e0 100644
--- a/build.gradle
+++ b/build.gradle
@@ -317,6 +317,7 @@ project(":samza-kv_$scalaVersion") {
     compile project(":samza-core_$scalaVersion")
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     testCompile "junit:junit:$junitVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
   }
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/ff40b12e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 275eb1a..87fcf58 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -121,6 +121,7 @@ object SamzaContainer extends Logging {
     }
     val serde = serdeName match {
       case "byte" => getSerde(classOf[ByteSerdeFactory].getCanonicalName)
+      case "bytebuffer" => 
getSerde(classOf[ByteBufferSerdeFactory].getCanonicalName)
       case "integer" => getSerde(classOf[IntegerSerdeFactory].getCanonicalName)
       case "json" => getSerde(classOf[JsonSerdeFactory].getCanonicalName)
       case "long" => getSerde(classOf[LongSerdeFactory].getCanonicalName)

http://git-wip-us.apache.org/repos/asf/samza/blob/ff40b12e/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala 
b/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala
new file mode 100644
index 0000000..05c3e38
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.apache.samza.config.Config
+import java.nio.ByteBuffer
+
+/**
+ * A serializer for bytes that is effectively a no-op but can be useful for
+ * binary messages.
+ */
+class ByteBufferSerdeFactory extends SerdeFactory[ByteBuffer] {
+  def getSerde(name: String, config: Config): Serde[ByteBuffer] = new 
ByteBufferSerde
+}
+
+class ByteBufferSerde extends Serde[ByteBuffer] {
+  def toBytes(byteBuffer: ByteBuffer) = {
+    if (byteBuffer != null) {
+      val bytes = new Array[Byte](byteBuffer.remaining())
+      byteBuffer.get(bytes)
+      bytes
+    } else {
+      null
+    }
+  }
+
+  def fromBytes(bytes: Array[Byte]) = if (bytes != null) {
+    ByteBuffer.wrap(bytes)
+  } else {
+    null
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/ff40b12e/samza-core/src/main/scala/org/apache/samza/serializers/ByteSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/serializers/ByteSerde.scala 
b/samza-core/src/main/scala/org/apache/samza/serializers/ByteSerde.scala
index e7ce09f..968da26 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/ByteSerde.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/ByteSerde.scala
@@ -22,7 +22,8 @@ package org.apache.samza.serializers
 import org.apache.samza.config.Config
 
 /**
- * A serializer for bytes that is effectively a no-op but can be useful for 
binary messages.
+ * A serializer for bytes that is effectively a no-op but can be useful for 
+ * binary messages.
  */
 class ByteSerdeFactory extends SerdeFactory[Array[Byte]] {
   def getSerde(name: String, config: Config): Serde[Array[Byte]] = new 
ByteSerde

http://git-wip-us.apache.org/repos/asf/samza/blob/ff40b12e/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
 
b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
new file mode 100644
index 0000000..9401d70
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.util.Arrays
+import org.junit.Assert._
+import org.junit.Test
+import java.nio.ByteBuffer
+
+class TestByteBufferSerde {
+  @Test
+  def test {
+    val serde = new ByteBufferSerde
+    assertNull(serde.toBytes(null))
+    assertNull(serde.fromBytes(null))
+
+    val bytes = "A lazy way of creating a byte array".getBytes()
+    val testBytes = ByteBuffer.wrap(bytes)
+    testBytes.mark()
+    assertArrayEquals(serde.toBytes(testBytes), bytes)
+    testBytes.reset()
+    assertEquals(serde.fromBytes(bytes), testBytes)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/ff40b12e/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 84cf6db..61bb3f6 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -72,6 +72,9 @@ class CachedStore[K, V](
     }
   }
 
+  /** tracks whether an array has been used as a key. since this is dangerous 
with LinkedHashMap, we want to warn on it. **/
+  private var containsArrayKeys = false
+
   // Use counters here, rather than directly accessing variables using .size
   // since metrics can be accessed in other threads, and cache.size is not
   // thread safe since we're using a LinkedHashMap, and dirty.size is slow
@@ -109,6 +112,8 @@ class CachedStore[K, V](
   def put(key: K, value: V) {
     metrics.puts.inc
 
+    checkKeyIsArray(key)
+
     // Add the key to the front of the dirty list (and remove any prior
     // occurrences to dedupe).
     val found = cache.get(key)
@@ -194,6 +199,16 @@ class CachedStore[K, V](
 
     store.close
   }
+
+  private def checkKeyIsArray(key: K) {
+    if (!containsArrayKeys && key.isInstanceOf[Array[_]]) {
+      // Warn the first time that we see an array key.
+      warn("Using arrays as keys results in unpredictable behavior since cache 
is implemented with a map. Consider using ByteBuffer, or a different key type.")
+      containsArrayKeys = true
+    }
+  }
+
+  def hasArrayKeys = containsArrayKeys
 }
 
 private case class CacheEntry[K, V](var value: V, var dirty: 
mutable.DoubleLinkedList[K])

http://git-wip-us.apache.org/repos/asf/samza/blob/ff40b12e/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala 
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala
new file mode 100644
index 0000000..d03ec92
--- /dev/null
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import org.junit.Test
+import org.junit.Assert._
+import org.mockito.Mockito._
+
+class TestCachedStore {
+  @Test
+  def testArrayCheck {
+    val kv = mock(classOf[KeyValueStore[Array[Byte], Array[Byte]]])
+    val store = new CachedStore[Array[Byte], Array[Byte]](kv, 100, 100)
+    assertFalse(store.hasArrayKeys)
+    store.put("test1-key".getBytes("UTF-8"), "test1-value".getBytes("UTF-8"))
+    assertTrue(store.hasArrayKeys)
+  }
+}
\ No newline at end of file

Reply via email to