Repository: storm
Updated Branches:
  refs/heads/master e9785d8f1 -> d593918a5


STORM-1649 kryo serialization in windowing


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b850847d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b850847d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b850847d

Branch: refs/heads/master
Commit: b850847d946ba5ad9809308065352a94bd6287b3
Parents: 33e4994
Author: Satish Duggana <[email protected]>
Authored: Tue Mar 29 10:02:28 2016 +0530
Committer: Satish Duggana <[email protected]>
Committed: Fri Apr 1 09:28:22 2016 +0530

----------------------------------------------------------------------
 .../trident/windowing/HBaseWindowsStore.java    | 40 +++++----
 .../windowing/HBaseWindowsStoreFactory.java     |  4 +-
 .../windowing/InMemoryWindowsStoreFactory.java  |  6 +-
 .../trident/windowing/WindowKryoSerializer.java | 87 ++++++++++++++++++++
 .../windowing/WindowTridentProcessor.java       |  8 +-
 .../trident/windowing/WindowsStateUpdater.java  |  2 +-
 .../storm/trident/windowing/WindowsStore.java   |  1 +
 .../trident/windowing/WindowsStoreFactory.java  |  9 +-
 8 files changed, 122 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
index ccce03a..e319a55 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -18,9 +18,6 @@
  */
 package org.apache.storm.hbase.trident.windowing;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -29,11 +26,11 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.storm.trident.windowing.WindowKryoSerializer;
 import org.apache.storm.trident.windowing.WindowsStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
@@ -41,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -53,11 +51,12 @@ public class HBaseWindowsStore implements WindowsStore {
     public static final String UTF_8 = "utf-8";
 
     private final ThreadLocal<HTable> threadLocalHtable;
+    private final ThreadLocal<WindowKryoSerializer> 
threadLocalWindowKryoSerializer;
     private final Queue<HTable> htables = new ConcurrentLinkedQueue<>();
     private final byte[] family;
     private final byte[] qualifier;
 
-    public HBaseWindowsStore(final Configuration config, final String 
tableName, byte[] family, byte[] qualifier) {
+    public HBaseWindowsStore(final Map stormConf, final Configuration config, 
final String tableName, byte[] family, byte[] qualifier) {
         this.family = family;
         this.qualifier = qualifier;
 
@@ -74,12 +73,23 @@ public class HBaseWindowsStore implements WindowsStore {
             }
         };
 
+        threadLocalWindowKryoSerializer = new 
ThreadLocal<WindowKryoSerializer>(){
+            @Override
+            protected WindowKryoSerializer initialValue() {
+                return new WindowKryoSerializer(stormConf);
+            }
+        };
+
     }
 
     private HTable htable() {
         return threadLocalHtable.get();
     }
 
+    private WindowKryoSerializer windowKryoSerializer() {
+        return threadLocalWindowKryoSerializer.get();
+    }
+
     private byte[] effectiveKey(String key) {
         try {
             return key.getBytes(UTF_8);
@@ -105,11 +115,7 @@ public class HBaseWindowsStore implements WindowsStore {
             return null;
         }
 
-        Kryo kryo = new Kryo();
-        Input input = new Input(result.getValue(family, qualifier));
-        Object resultObject = kryo.readClassAndObject(input);
-        return resultObject;
-
+        return windowKryoSerializer().deserialize(result.getValue(family, 
qualifier));
     }
 
     @Override
@@ -129,7 +135,6 @@ public class HBaseWindowsStore implements WindowsStore {
             throw new RuntimeException(e);
         }
 
-        Kryo kryo = new Kryo();
         List<Object> values = new ArrayList<>();
         for (int i=0; i<results.length; i++) {
             Result result = results[i];
@@ -137,8 +142,7 @@ public class HBaseWindowsStore implements WindowsStore {
                 LOG.error("Got empty result for key [{}]", keys.get(i));
                 throw new RuntimeException("Received empty result for key: 
"+keys.get(i));
             }
-            Input input = new Input(result.getValue(family, qualifier));
-            Object resultObject = kryo.readClassAndObject(input);
+            Object resultObject = 
windowKryoSerializer().deserialize(result.getValue(family, qualifier));
             values.add(resultObject);
         }
 
@@ -200,10 +204,7 @@ public class HBaseWindowsStore implements WindowsStore {
             throw new IllegalArgumentException("Invalid value of null with 
key: "+key);
         }
         Put put = new Put(effectiveKey(key));
-        Kryo kryo = new Kryo();
-        Output output = new Output(new ByteArrayOutputStream());
-        kryo.writeClassAndObject(output, value);
-        put.addColumn(family, ByteBuffer.wrap(qualifier), 
System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, 
output.position()));
+        put.addColumn(family, ByteBuffer.wrap(qualifier), 
System.currentTimeMillis(), 
windowKryoSerializer().serializeToByteBuffer(value));
         try {
             htable().put(put);
         } catch (IOException e) {
@@ -216,10 +217,7 @@ public class HBaseWindowsStore implements WindowsStore {
         List<Put> list = new ArrayList<>();
         for (Entry entry : entries) {
             Put put = new Put(effectiveKey(entry.key));
-            Output output = new Output(new ByteArrayOutputStream());
-            Kryo kryo = new Kryo();
-            kryo.writeClassAndObject(output, entry.value);
-            put.addColumn(family, ByteBuffer.wrap(qualifier), 
System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, 
output.position()));
+            put.addColumn(family, ByteBuffer.wrap(qualifier), 
System.currentTimeMillis(), 
windowKryoSerializer().serializeToByteBuffer(entry.value));
             list.add(put);
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
index a47d5fb..a455924 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
@@ -42,14 +42,14 @@ public class HBaseWindowsStoreFactory implements 
WindowsStoreFactory {
         this.qualifier = qualifier;
     }
 
-    public WindowsStore create() {
+    public WindowsStore create(Map stormConf) {
         Configuration configuration = HBaseConfiguration.create();
         for (Map.Entry<String, Object> entry : config.entrySet()) {
             if (entry.getValue() != null) {
                 configuration.set(entry.getKey(), entry.getValue().toString());
             }
         }
-        return new HBaseWindowsStore(configuration, tableName, family, 
qualifier);
+        return new HBaseWindowsStore(stormConf, configuration, tableName, 
family, qualifier);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
index cf65594..f7e114d 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
@@ -18,12 +18,10 @@
  */
 package org.apache.storm.trident.windowing;
 
-import org.apache.storm.trident.operation.Aggregator;
 import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.windowing.config.WindowConfig;
-import org.apache.storm.tuple.Fields;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * InMemoryWindowsStoreFactory contains a single instance of {@link 
InMemoryWindowsStore} which will be used for
@@ -37,7 +35,7 @@ public class InMemoryWindowsStoreFactory implements 
WindowsStoreFactory {
     private InMemoryWindowsStore inMemoryWindowsStore;
 
     @Override
-    public WindowsStore create() {
+    public WindowsStore create(Map stormConf) {
         if(inMemoryWindowsStore == null) {
             inMemoryWindowsStore = new InMemoryWindowsStore();
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java
new file mode 100644
index 0000000..b105180
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.storm.serialization.SerializationFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+/**
+ * Kryo serializer/deserializer for values that are stored as part of 
windowing. This can be used in {@link WindowsStore}.
+ * This class is not thread safe.
+ *
+ */
+public class WindowKryoSerializer {
+
+    private final Kryo kryo;
+    private final Output output;
+    private final Input input;
+
+    public WindowKryoSerializer(Map stormConf) {
+        kryo = SerializationFactory.getKryo(stormConf);
+        output = new Output(2000, 2_000_000_000);
+        input = new Input();
+    }
+
+    /**
+     * Serializes the given object into a byte array using Kryo serialization.
+     *
+     * @param obj Object to be serialized.
+     */
+    public byte[] serialize(Object obj) {
+        output.clear();
+        kryo.writeClassAndObject(output, obj);
+        return output.toBytes();
+    }
+
+    /**
+     * Serializes the given object into a {@link ByteBuffer} backed by the 
byte array returned by Kryo serialization.
+     *
+     * @param obj Object to be serialized.
+     */
+    public ByteBuffer serializeToByteBuffer(Object obj) {
+        output.clear();
+        kryo.writeClassAndObject(output, obj);
+        return ByteBuffer.wrap(output.getBuffer(), 0, output.position());
+    }
+
+    /**
+     * Returns an Object which is created using Kryo deserialization of given 
byte array instance.
+     *
+     * @param buff byte array to be deserialized into an Object
+     */
+    public Object deserialize(byte[] buff) {
+        input.setBuffer(buff);
+        return kryo.readClassAndObject(input);
+    }
+
+    /**
+     * Returns an Object which is created using Kryo deserialization of given 
{@code byteBuffer} instance.
+     *
+     * @param byteBuffer byte buffer to be deserialized into an Object
+     */
+    public Object deserialize(ByteBuffer byteBuffer) {
+        input.setBuffer(byteBuffer.array(), byteBuffer.arrayOffset(), 
byteBuffer.position());
+        return kryo.readClassAndObject(input);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
index 5125e41..9b12057 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@@ -65,7 +65,6 @@ public class WindowTridentProcessor implements 
TridentProcessor {
     private WindowsStoreFactory windowStoreFactory;
     private WindowsStore windowStore;
 
-    private Map conf;
     private TopologyContext topologyContext;
     private FreshCollector collector;
     private TridentTupleView.ProjectionFactory projection;
@@ -85,21 +84,20 @@ public class WindowTridentProcessor implements 
TridentProcessor {
     }
 
     @Override
-    public void prepare(Map conf, TopologyContext context, TridentContext 
tridentContext) {
-        this.conf = conf;
+    public void prepare(Map stormConf, TopologyContext context, TridentContext 
tridentContext) {
         this.topologyContext = context;
         List<TridentTuple.Factory> parents = 
tridentContext.getParentTupleFactories();
         if (parents.size() != 1) {
             throw new RuntimeException("Aggregation related operation can only 
have one parent");
         }
 
-        Long maxTuplesCacheSize = getWindowTuplesCacheSize(conf);
+        Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf);
 
         this.tridentContext = tridentContext;
         collector = new FreshCollector(tridentContext);
         projection = new TridentTupleView.ProjectionFactory(parents.get(0), 
inputFields);
 
-        windowStore = windowStoreFactory.create();
+        windowStore = windowStoreFactory.create(stormConf);
         windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + 
topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
         windowTriggerInprocessId = 
getWindowTriggerInprocessIdPrefix(windowTaskId);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
index 6664b41..8042e93 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
@@ -71,7 +71,7 @@ public class WindowsStateUpdater implements 
StateUpdater<WindowsState> {
 
     @Override
     public void prepare(Map conf, TridentOperationContext context) {
-        windowsStore = windowStoreFactory.create();
+        windowsStore = windowStoreFactory.create(conf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
index 8904b7b..e09ac5e 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 /**
  * Store for storing window related entities like windowed tuples, triggers 
etc.
+ * {@link WindowKryoSerializer} can be used for kryo 
serialization/deserialization of keys and values.
  *
  */
 public interface WindowsStore extends Serializable {

http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
index 409d672..edd0cb2 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
@@ -18,7 +18,11 @@
  */
 package org.apache.storm.trident.windowing;
 
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.planner.processor.TridentContext;
+
 import java.io.Serializable;
+import java.util.Map;
 
 /**
  * Factory to create instances of {@code WindowsStore}.
@@ -29,7 +33,8 @@ public interface WindowsStoreFactory extends Serializable {
     /**
      * Creates a window store
      *
-     * @return
+     * @param stormConf storm topology configuration passed in {@link 
org.apache.storm.trident.planner.TridentProcessor#prepare(Map, TopologyContext, 
TridentContext)}
+     *
      */
-    public WindowsStore create();
+    public WindowsStore create(Map stormConf);
 }

Reply via email to