Repository: storm
Updated Branches:
  refs/heads/master 084492782 -> 33e499479


Revert "STORM-1649 kryo serialization in windowing"

This reverts commit 576499baba14926a564a51e0018f3ead86809ab1.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fd02ac60
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fd02ac60
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fd02ac60

Branch: refs/heads/master
Commit: fd02ac603276b3b38a755eb2c967a51c0b630039
Parents: 0844927
Author: P. Taylor Goetz <[email protected]>
Authored: Thu Mar 31 16:33:28 2016 -0400
Committer: P. Taylor Goetz <[email protected]>
Committed: Thu Mar 31 16:33:28 2016 -0400

----------------------------------------------------------------------
 .../trident/windowing/HBaseWindowsStore.java    | 40 ++++-----
 .../windowing/HBaseWindowsStoreFactory.java     |  4 +-
 .../windowing/InMemoryWindowsStoreFactory.java  |  6 +-
 .../trident/windowing/WindowKryoSerializer.java | 87 --------------------
 .../windowing/WindowTridentProcessor.java       |  8 +-
 .../trident/windowing/WindowsStateUpdater.java  |  2 +-
 .../storm/trident/windowing/WindowsStore.java   |  1 -
 .../trident/windowing/WindowsStoreFactory.java  |  9 +-
 8 files changed, 35 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
index e319a55..ccce03a 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -18,6 +18,9 @@
  */
 package org.apache.storm.hbase.trident.windowing;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -26,11 +29,11 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.storm.trident.windowing.WindowKryoSerializer;
 import org.apache.storm.trident.windowing.WindowsStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
@@ -38,7 +41,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -51,12 +53,11 @@ public class HBaseWindowsStore implements WindowsStore {
     public static final String UTF_8 = "utf-8";
 
     private final ThreadLocal<HTable> threadLocalHtable;
-    private final ThreadLocal<WindowKryoSerializer> 
threadLocalWindowKryoSerializer;
     private final Queue<HTable> htables = new ConcurrentLinkedQueue<>();
     private final byte[] family;
     private final byte[] qualifier;
 
-    public HBaseWindowsStore(final Map stormConf, final Configuration config, 
final String tableName, byte[] family, byte[] qualifier) {
+    public HBaseWindowsStore(final Configuration config, final String 
tableName, byte[] family, byte[] qualifier) {
         this.family = family;
         this.qualifier = qualifier;
 
@@ -73,23 +74,12 @@ public class HBaseWindowsStore implements WindowsStore {
             }
         };
 
-        threadLocalWindowKryoSerializer = new 
ThreadLocal<WindowKryoSerializer>(){
-            @Override
-            protected WindowKryoSerializer initialValue() {
-                return new WindowKryoSerializer(stormConf);
-            }
-        };
-
     }
 
     private HTable htable() {
         return threadLocalHtable.get();
     }
 
-    private WindowKryoSerializer windowKryoSerializer() {
-        return threadLocalWindowKryoSerializer.get();
-    }
-
     private byte[] effectiveKey(String key) {
         try {
             return key.getBytes(UTF_8);
@@ -115,7 +105,11 @@ public class HBaseWindowsStore implements WindowsStore {
             return null;
         }
 
-        return windowKryoSerializer().deserialize(result.getValue(family, 
qualifier));
+        Kryo kryo = new Kryo();
+        Input input = new Input(result.getValue(family, qualifier));
+        Object resultObject = kryo.readClassAndObject(input);
+        return resultObject;
+
     }
 
     @Override
@@ -135,6 +129,7 @@ public class HBaseWindowsStore implements WindowsStore {
             throw new RuntimeException(e);
         }
 
+        Kryo kryo = new Kryo();
         List<Object> values = new ArrayList<>();
         for (int i=0; i<results.length; i++) {
             Result result = results[i];
@@ -142,7 +137,8 @@ public class HBaseWindowsStore implements WindowsStore {
                 LOG.error("Got empty result for key [{}]", keys.get(i));
                 throw new RuntimeException("Received empty result for key: 
"+keys.get(i));
             }
-            Object resultObject = 
windowKryoSerializer().deserialize(result.getValue(family, qualifier));
+            Input input = new Input(result.getValue(family, qualifier));
+            Object resultObject = kryo.readClassAndObject(input);
             values.add(resultObject);
         }
 
@@ -204,7 +200,10 @@ public class HBaseWindowsStore implements WindowsStore {
             throw new IllegalArgumentException("Invalid value of null with 
key: "+key);
         }
         Put put = new Put(effectiveKey(key));
-        put.addColumn(family, ByteBuffer.wrap(qualifier), 
System.currentTimeMillis(), 
windowKryoSerializer().serializeToByteBuffer(value));
+        Kryo kryo = new Kryo();
+        Output output = new Output(new ByteArrayOutputStream());
+        kryo.writeClassAndObject(output, value);
+        put.addColumn(family, ByteBuffer.wrap(qualifier), 
System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, 
output.position()));
         try {
             htable().put(put);
         } catch (IOException e) {
@@ -217,7 +216,10 @@ public class HBaseWindowsStore implements WindowsStore {
         List<Put> list = new ArrayList<>();
         for (Entry entry : entries) {
             Put put = new Put(effectiveKey(entry.key));
-            put.addColumn(family, ByteBuffer.wrap(qualifier), 
System.currentTimeMillis(), 
windowKryoSerializer().serializeToByteBuffer(entry.value));
+            Output output = new Output(new ByteArrayOutputStream());
+            Kryo kryo = new Kryo();
+            kryo.writeClassAndObject(output, entry.value);
+            put.addColumn(family, ByteBuffer.wrap(qualifier), 
System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, 
output.position()));
             list.add(put);
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
index a455924..a47d5fb 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
@@ -42,14 +42,14 @@ public class HBaseWindowsStoreFactory implements 
WindowsStoreFactory {
         this.qualifier = qualifier;
     }
 
-    public WindowsStore create(Map stormConf) {
+    public WindowsStore create() {
         Configuration configuration = HBaseConfiguration.create();
         for (Map.Entry<String, Object> entry : config.entrySet()) {
             if (entry.getValue() != null) {
                 configuration.set(entry.getKey(), entry.getValue().toString());
             }
         }
-        return new HBaseWindowsStore(stormConf, configuration, tableName, 
family, qualifier);
+        return new HBaseWindowsStore(configuration, tableName, family, 
qualifier);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
index f7e114d..cf65594 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
@@ -18,10 +18,12 @@
  */
 package org.apache.storm.trident.windowing;
 
+import org.apache.storm.trident.operation.Aggregator;
 import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
 
 import java.util.List;
-import java.util.Map;
 
 /**
  * InMemoryWindowsStoreFactory contains a single instance of {@link 
InMemoryWindowsStore} which will be used for
@@ -35,7 +37,7 @@ public class InMemoryWindowsStoreFactory implements 
WindowsStoreFactory {
     private InMemoryWindowsStore inMemoryWindowsStore;
 
     @Override
-    public WindowsStore create(Map stormConf) {
+    public WindowsStore create() {
         if(inMemoryWindowsStore == null) {
             inMemoryWindowsStore = new InMemoryWindowsStore();
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java
deleted file mode 100644
index b105180..0000000
--- 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.trident.windowing;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import org.apache.storm.serialization.SerializationFactory;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-/**
- * Kryo serializer/deserializer for values that are stored as part of 
windowing. This can be used in {@link WindowsStore}.
- * This class is not thread safe.
- *
- */
-public class WindowKryoSerializer {
-
-    private final Kryo kryo;
-    private final Output output;
-    private final Input input;
-
-    public WindowKryoSerializer(Map stormConf) {
-        kryo = SerializationFactory.getKryo(stormConf);
-        output = new Output(2000, 2_000_000_000);
-        input = new Input();
-    }
-
-    /**
-     * Serializes the given object into a byte array using Kryo serialization.
-     *
-     * @param obj Object to be serialized.
-     */
-    public byte[] serialize(Object obj) {
-        output.clear();
-        kryo.writeClassAndObject(output, obj);
-        return output.toBytes();
-    }
-
-    /**
-     * Serializes the given object into a {@link ByteBuffer} backed by the 
byte array returned by Kryo serialization.
-     *
-     * @param obj Object to be serialized.
-     */
-    public ByteBuffer serializeToByteBuffer(Object obj) {
-        output.clear();
-        kryo.writeClassAndObject(output, obj);
-        return ByteBuffer.wrap(output.getBuffer(), 0, output.position());
-    }
-
-    /**
-     * Returns an Object which is created using Kryo deserialization of given 
byte array instance.
-     *
-     * @param buff byte array to be deserialized into an Object
-     */
-    public Object deserialize(byte[] buff) {
-        input.setBuffer(buff);
-        return kryo.readClassAndObject(input);
-    }
-
-    /**
-     * Returns an Object which is created using Kryo deserialization of given 
{@code byteBuffer} instance.
-     *
-     * @param byteBuffer byte buffer to be deserialized into an Object
-     */
-    public Object deserialize(ByteBuffer byteBuffer) {
-        input.setBuffer(byteBuffer.array(), byteBuffer.arrayOffset(), 
byteBuffer.position());
-        return kryo.readClassAndObject(input);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
index 9b12057..5125e41 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@@ -65,6 +65,7 @@ public class WindowTridentProcessor implements 
TridentProcessor {
     private WindowsStoreFactory windowStoreFactory;
     private WindowsStore windowStore;
 
+    private Map conf;
     private TopologyContext topologyContext;
     private FreshCollector collector;
     private TridentTupleView.ProjectionFactory projection;
@@ -84,20 +85,21 @@ public class WindowTridentProcessor implements 
TridentProcessor {
     }
 
     @Override
-    public void prepare(Map stormConf, TopologyContext context, TridentContext 
tridentContext) {
+    public void prepare(Map conf, TopologyContext context, TridentContext 
tridentContext) {
+        this.conf = conf;
         this.topologyContext = context;
         List<TridentTuple.Factory> parents = 
tridentContext.getParentTupleFactories();
         if (parents.size() != 1) {
             throw new RuntimeException("Aggregation related operation can only 
have one parent");
         }
 
-        Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf);
+        Long maxTuplesCacheSize = getWindowTuplesCacheSize(conf);
 
         this.tridentContext = tridentContext;
         collector = new FreshCollector(tridentContext);
         projection = new TridentTupleView.ProjectionFactory(parents.get(0), 
inputFields);
 
-        windowStore = windowStoreFactory.create(stormConf);
+        windowStore = windowStoreFactory.create();
         windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + 
topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
         windowTriggerInprocessId = 
getWindowTriggerInprocessIdPrefix(windowTaskId);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
index 8042e93..6664b41 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
@@ -71,7 +71,7 @@ public class WindowsStateUpdater implements 
StateUpdater<WindowsState> {
 
     @Override
     public void prepare(Map conf, TridentOperationContext context) {
-        windowsStore = windowStoreFactory.create(conf);
+        windowsStore = windowStoreFactory.create();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
index e09ac5e..8904b7b 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
@@ -26,7 +26,6 @@ import java.util.List;
 
 /**
  * Store for storing window related entities like windowed tuples, triggers 
etc.
- * {@link WindowKryoSerializer} can be used for kryo 
serialization/deserialization of keys and values.
  *
  */
 public interface WindowsStore extends Serializable {

http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
index edd0cb2..409d672 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
@@ -18,11 +18,7 @@
  */
 package org.apache.storm.trident.windowing;
 
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.planner.processor.TridentContext;
-
 import java.io.Serializable;
-import java.util.Map;
 
 /**
  * Factory to create instances of {@code WindowsStore}.
@@ -33,8 +29,7 @@ public interface WindowsStoreFactory extends Serializable {
     /**
      * Creates a window store
      *
-     * @param stormConf storm topology configuration passed in {@link 
org.apache.storm.trident.planner.TridentProcessor#prepare(Map, TopologyContext, 
TridentContext)}
-     *
+     * @return
      */
-    public WindowsStore create(Map stormConf);
+    public WindowsStore create();
 }

Reply via email to