Repository: samza
Updated Branches:
  refs/heads/master d32e8bb3a -> c27a25328


SAMZA-1370; Memory leak in CachedStore when using ByteBufferSerde as key Serde

ByteBufferSerde uses relative bulk get to serialize the provided ByteBuffer 
which changes its internal position. ByteBuffer's `equals` and `hashCode` 
depend upon its remaining elements, i.e. on its position. This means that when 
using ByteBuffers as keys in the CachedStore, flushing cache contents to the 
underlying store changes their hashCode. Since the hashCode for the key no 
longer matches the one used when inserting it into the map, the LinkedHashMap 
cannot correctly evict or remove these entries, leading to a memory leak.

Changing ByteBufferSerde to duplicate the provided ByteBuffer before copying 
should fix this issue. Prefer this over using absolute gets since there's no 
bulk absolute get API for ByteBuffer.

Author: Prateek Maheshwari <pmahe...@linkedin.com>

Reviewers: Jagadish <jagad...@apache.org>

Closes #251 from prateekm/bytebufferserde


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c27a2532
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c27a2532
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c27a2532

Branch: refs/heads/master
Commit: c27a253283bbc09e82c4e9d669bf54d1460c5299
Parents: d32e8bb
Author: Prateek Maheshwari <pmahe...@linkedin.com>
Authored: Thu Jul 27 15:10:43 2017 -0700
Committer: Jagadish <jagad...@apache.org>
Committed: Thu Jul 27 15:10:43 2017 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |  8 ++++++-
 .../samza/serializers/ByteBufferSerde.scala     |  5 ++--
 .../samza/serializers/TestByteBufferSerde.scala | 24 +++++++++++++++-----
 3 files changed, 27 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c27a2532/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 3263856..bdf477a 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1081,6 +1081,8 @@
                         <dl>
                             
<dt><code>org.apache.samza.serializers.ByteSerdeFactory</code></dt>
                             <dd>A no-op serde which passes through the 
undecoded byte array.</dd>
+                            
<dt><code>org.apache.samza.serializers.ByteBufferSerdeFactory</code></dt>
+                            <dd>Encodes <code>java.nio.ByteBuffer</code> 
objects.</dd>
                             
<dt><code>org.apache.samza.serializers.IntegerSerdeFactory</code></dt>
                             <dd>Encodes <code>java.lang.Integer</code> objects 
as binary (4 bytes fixed-length big-endian encoding).</dd>
                             
<dt><code>org.apache.samza.serializers.StringSerdeFactory</code></dt>
@@ -1090,7 +1092,11 @@
                             
<dt><code>org.apache.samza.serializers.LongSerdeFactory</code></dt>
                             <dd>Encodes <code>java.lang.Long</code> as binary 
(8 bytes fixed-length big-endian encoding).</dd>
                             
<dt><code>org.apache.samza.serializers.DoubleSerdeFactory</code></dt>
-                            <dd>Encodes <code>java.lang.Double</code> as 
binray (8 bytes double-precision float point).</dd>
+                            <dd>Encodes <code>java.lang.Double</code> as 
binary (8 bytes double-precision float point).</dd>
+                            
<dt><code>org.apache.samza.serializers.UUIDSerdeFactory</code></dt>
+                            <dd>Encodes <code>java.util.UUID</code> 
objects.</dd>
+                            
<dt><code>org.apache.samza.serializers.SerializableSerdeFactory</code></dt>
+                            <dd>Encodes <code>java.io.Serializable</code> 
objects.</dd>
                             
<dt><code>org.apache.samza.serializers.MetricsSnapshotSerdeFactory</code></dt>
                             <dd>Encodes 
<code>org.apache.samza.metrics.reporter.MetricsSnapshot</code> objects (which 
are
                                 used for <a 
href="../container/metrics.html">reporting metrics</a>) as JSON.</dd>

http://git-wip-us.apache.org/repos/asf/samza/blob/c27a2532/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala 
b/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala
index 05c3e38..adb8781 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala
@@ -23,8 +23,7 @@ import org.apache.samza.config.Config
 import java.nio.ByteBuffer
 
 /**
- * A serializer for bytes that is effectively a no-op but can be useful for
- * binary messages.
+ * A serializer for ByteBuffers.
  */
 class ByteBufferSerdeFactory extends SerdeFactory[ByteBuffer] {
   def getSerde(name: String, config: Config): Serde[ByteBuffer] = new 
ByteBufferSerde
@@ -34,7 +33,7 @@ class ByteBufferSerde extends Serde[ByteBuffer] {
   def toBytes(byteBuffer: ByteBuffer) = {
     if (byteBuffer != null) {
       val bytes = new Array[Byte](byteBuffer.remaining())
-      byteBuffer.get(bytes)
+      byteBuffer.duplicate().get(bytes)
       bytes
     } else {
       null

http://git-wip-us.apache.org/repos/asf/samza/blob/c27a2532/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
 
b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
index 9401d70..eddfb0a 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
@@ -26,16 +26,28 @@ import java.nio.ByteBuffer
 
 class TestByteBufferSerde {
   @Test
-  def test {
+  def testSerde {
     val serde = new ByteBufferSerde
     assertNull(serde.toBytes(null))
     assertNull(serde.fromBytes(null))
 
     val bytes = "A lazy way of creating a byte array".getBytes()
-    val testBytes = ByteBuffer.wrap(bytes)
-    testBytes.mark()
-    assertArrayEquals(serde.toBytes(testBytes), bytes)
-    testBytes.reset()
-    assertEquals(serde.fromBytes(bytes), testBytes)
+    val byteBuffer = ByteBuffer.wrap(bytes)
+    byteBuffer.mark()
+    assertArrayEquals(serde.toBytes(byteBuffer), bytes)
+    byteBuffer.reset()
+    assertEquals(serde.fromBytes(bytes), byteBuffer)
+  }
+
+  @Test
+  def testSerializationPreservesInput {
+    val serde = new ByteBufferSerde
+    val bytes = "A lazy way of creating a byte array".getBytes()
+    val byteBuffer = ByteBuffer.wrap(bytes)
+    byteBuffer.get() // advance position by 1
+    serde.toBytes(byteBuffer)
+
+    assertEquals(byteBuffer.capacity(), byteBuffer.limit())
+    assertEquals(1, byteBuffer.position())
   }
 }
\ No newline at end of file

Reply via email to