Repository: storm Updated Branches: refs/heads/master 084492782 -> 33e499479
Revert "STORM-1649 kryo serialization in windowing" This reverts commit 576499baba14926a564a51e0018f3ead86809ab1. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fd02ac60 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fd02ac60 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fd02ac60 Branch: refs/heads/master Commit: fd02ac603276b3b38a755eb2c967a51c0b630039 Parents: 0844927 Author: P. Taylor Goetz <[email protected]> Authored: Thu Mar 31 16:33:28 2016 -0400 Committer: P. Taylor Goetz <[email protected]> Committed: Thu Mar 31 16:33:28 2016 -0400 ---------------------------------------------------------------------- .../trident/windowing/HBaseWindowsStore.java | 40 ++++----- .../windowing/HBaseWindowsStoreFactory.java | 4 +- .../windowing/InMemoryWindowsStoreFactory.java | 6 +- .../trident/windowing/WindowKryoSerializer.java | 87 -------------------- .../windowing/WindowTridentProcessor.java | 8 +- .../trident/windowing/WindowsStateUpdater.java | 2 +- .../storm/trident/windowing/WindowsStore.java | 1 - .../trident/windowing/WindowsStoreFactory.java | 9 +- 8 files changed, 35 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java index e319a55..ccce03a 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java @@ -18,6 +18,9 @@ */ package org.apache.storm.hbase.trident.windowing; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -26,11 +29,11 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; -import org.apache.storm.trident.windowing.WindowKryoSerializer; import org.apache.storm.trident.windowing.WindowsStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; @@ -38,7 +41,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -51,12 +53,11 @@ public class HBaseWindowsStore implements WindowsStore { public static final String UTF_8 = "utf-8"; private final ThreadLocal<HTable> threadLocalHtable; - private final ThreadLocal<WindowKryoSerializer> threadLocalWindowKryoSerializer; private final Queue<HTable> htables = new ConcurrentLinkedQueue<>(); private final byte[] family; private final byte[] qualifier; - public HBaseWindowsStore(final Map stormConf, final Configuration config, final String tableName, byte[] family, byte[] qualifier) { + public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) { this.family = family; this.qualifier = qualifier; @@ -73,23 +74,12 @@ public class HBaseWindowsStore implements WindowsStore { } }; - threadLocalWindowKryoSerializer = new ThreadLocal<WindowKryoSerializer>(){ - @Override - protected WindowKryoSerializer initialValue() { - return new WindowKryoSerializer(stormConf); - } - }; - } private HTable htable() { return threadLocalHtable.get(); } - private WindowKryoSerializer windowKryoSerializer() { - return threadLocalWindowKryoSerializer.get(); - } - private byte[] effectiveKey(String key) { try { return key.getBytes(UTF_8); @@ -115,7 +105,11 @@ public class HBaseWindowsStore implements WindowsStore { return null; } - return windowKryoSerializer().deserialize(result.getValue(family, qualifier)); + Kryo kryo = new Kryo(); + Input input = new Input(result.getValue(family, qualifier)); + Object resultObject = kryo.readClassAndObject(input); + return resultObject; + } @Override @@ -135,6 +129,7 @@ public class HBaseWindowsStore implements WindowsStore { throw new RuntimeException(e); } + Kryo kryo = new Kryo(); List<Object> values = new ArrayList<>(); for (int i=0; i<results.length; i++) { Result result = results[i]; @@ -142,7 +137,8 @@ public class HBaseWindowsStore implements WindowsStore { LOG.error("Got empty result for key [{}]", keys.get(i)); throw new RuntimeException("Received empty result for key: "+keys.get(i)); } - Object resultObject = windowKryoSerializer().deserialize(result.getValue(family, qualifier)); + Input input = new Input(result.getValue(family, qualifier)); + Object resultObject = kryo.readClassAndObject(input); values.add(resultObject); } @@ -204,7 +200,10 @@ public class HBaseWindowsStore implements WindowsStore { throw new IllegalArgumentException("Invalid value of null with key: "+key); } Put put = new Put(effectiveKey(key)); - put.addColumn(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), windowKryoSerializer().serializeToByteBuffer(value)); + Kryo kryo = new Kryo(); + Output output = new Output(new ByteArrayOutputStream()); + kryo.writeClassAndObject(output, value); + put.addColumn(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, output.position())); try { htable().put(put); } catch (IOException e) { @@ -217,7 +216,10 @@ public class HBaseWindowsStore implements WindowsStore { List<Put> list = new ArrayList<>(); for (Entry entry : entries) { Put put = new Put(effectiveKey(entry.key)); - put.addColumn(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), windowKryoSerializer().serializeToByteBuffer(entry.value)); + Output output = new Output(new ByteArrayOutputStream()); + Kryo kryo = new Kryo(); + kryo.writeClassAndObject(output, entry.value); + put.addColumn(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, output.position())); list.add(put); } http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java index a455924..a47d5fb 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java @@ -42,14 +42,14 @@ public class HBaseWindowsStoreFactory implements WindowsStoreFactory { this.qualifier = qualifier; } - public WindowsStore create(Map stormConf) { + public WindowsStore create() { Configuration configuration = HBaseConfiguration.create(); for (Map.Entry<String, Object> entry : config.entrySet()) { if (entry.getValue() != null) { configuration.set(entry.getKey(), entry.getValue().toString()); } } - return new HBaseWindowsStore(stormConf, configuration, tableName, family, qualifier); + return new HBaseWindowsStore(configuration, tableName, family, qualifier); } } http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java index f7e114d..cf65594 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java @@ -18,10 +18,12 @@ */ package org.apache.storm.trident.windowing; +import org.apache.storm.trident.operation.Aggregator; import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.tuple.Fields; import java.util.List; -import java.util.Map; /** * InMemoryWindowsStoreFactory contains a single instance of {@link InMemoryWindowsStore} which will be used for @@ -35,7 +37,7 @@ public class InMemoryWindowsStoreFactory implements WindowsStoreFactory { private InMemoryWindowsStore inMemoryWindowsStore; @Override - public WindowsStore create(Map stormConf) { + public WindowsStore create() { if(inMemoryWindowsStore == null) { inMemoryWindowsStore = new InMemoryWindowsStore(); } http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java deleted file mode 100644 index b105180..0000000 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.storm.trident.windowing; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import org.apache.storm.serialization.SerializationFactory; - -import java.nio.ByteBuffer; -import java.util.Map; - -/** - * Kryo serializer/deserializer for values that are stored as part of windowing. This can be used in {@link WindowsStore}. - * This class is not thread safe. - * - */ -public class WindowKryoSerializer { - - private final Kryo kryo; - private final Output output; - private final Input input; - - public WindowKryoSerializer(Map stormConf) { - kryo = SerializationFactory.getKryo(stormConf); - output = new Output(2000, 2_000_000_000); - input = new Input(); - } - - /** - * Serializes the given object into a byte array using Kryo serialization. - * - * @param obj Object to be serialized. - */ - public byte[] serialize(Object obj) { - output.clear(); - kryo.writeClassAndObject(output, obj); - return output.toBytes(); - } - - /** - * Serializes the given object into a {@link ByteBuffer} backed by the byte array returned by Kryo serialization. - * - * @param obj Object to be serialized. - */ - public ByteBuffer serializeToByteBuffer(Object obj) { - output.clear(); - kryo.writeClassAndObject(output, obj); - return ByteBuffer.wrap(output.getBuffer(), 0, output.position()); - } - - /** - * Returns an Object which is created using Kryo deserialization of given byte array instance. - * - * @param buff byte array to be deserialized into an Object - */ - public Object deserialize(byte[] buff) { - input.setBuffer(buff); - return kryo.readClassAndObject(input); - } - - /** - * Returns an Object which is created using Kryo deserialization of given {@code byteBuffer} instance. - * - * @param byteBuffer byte buffer to be deserialized into an Object - */ - public Object deserialize(ByteBuffer byteBuffer) { - input.setBuffer(byteBuffer.array(), byteBuffer.arrayOffset(), byteBuffer.position()); - return kryo.readClassAndObject(input); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java index 9b12057..5125e41 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java @@ -65,6 +65,7 @@ public class WindowTridentProcessor implements TridentProcessor { private WindowsStoreFactory windowStoreFactory; private WindowsStore windowStore; + private Map conf; private TopologyContext topologyContext; private FreshCollector collector; private TridentTupleView.ProjectionFactory projection; @@ -84,20 +85,21 @@ public class WindowTridentProcessor implements TridentProcessor { } @Override - public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) { + public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) { + this.conf = conf; this.topologyContext = context; List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories(); if (parents.size() != 1) { throw new RuntimeException("Aggregation related operation can only have one parent"); } - Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf); + Long maxTuplesCacheSize = getWindowTuplesCacheSize(conf); this.tridentContext = tridentContext; collector = new FreshCollector(tridentContext); projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields); - windowStore = windowStoreFactory.create(stormConf); + windowStore = windowStoreFactory.create(); windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR; windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId); http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java index 8042e93..6664b41 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java @@ -71,7 +71,7 @@ public class WindowsStateUpdater implements StateUpdater<WindowsState> { @Override public void prepare(Map conf, TridentOperationContext context) { - windowsStore = windowStoreFactory.create(conf); + windowsStore = windowStoreFactory.create(); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java index e09ac5e..8904b7b 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java @@ -26,7 +26,6 @@ import java.util.List; /** * Store for storing window related entities like windowed tuples, triggers etc. - * {@link WindowKryoSerializer} can be used for kryo serialization/deserialization of keys and values. * */ public interface WindowsStore extends Serializable { http://git-wip-us.apache.org/repos/asf/storm/blob/fd02ac60/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java index edd0cb2..409d672 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java @@ -18,11 +18,7 @@ */ package org.apache.storm.trident.windowing; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.trident.planner.processor.TridentContext; - import java.io.Serializable; -import java.util.Map; /** * Factory to create instances of {@code WindowsStore}. @@ -33,8 +29,7 @@ public interface WindowsStoreFactory extends Serializable { /** * Creates a window store * - * @param stormConf storm topology configuration passed in {@link org.apache.storm.trident.planner.TridentProcessor#prepare(Map, TopologyContext, TridentContext)} - * + * @return */ - public WindowsStore create(Map stormConf); + public WindowsStore create(); }
