jstorm-runner: 1. Add kryo serializer for Collections.SingletonLists 2. Fix concurrent problem of elementIndex of JStormBagState
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/201ef722 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/201ef722 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/201ef722 Branch: refs/heads/jstorm-runner Commit: 201ef722ec36b0ffa8197722fdf898fb9978803c Parents: 1bf3224 Author: basti.lj <[email protected]> Authored: Wed Jul 19 20:15:56 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:02:59 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/jstorm/JStormRunner.java | 2 + .../serialization/CollectionsSerializer.java | 43 +++++++++++++++++++ .../jstorm/translation/ExecutorsBolt.java | 2 +- .../translation/JStormStateInternals.java | 44 +++++++++++++------- 4 files changed, 75 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/201ef722/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java index 286a975..56db1c6 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java @@ -31,6 +31,7 @@ import com.alibaba.jstorm.cluster.StormConfig; import com.alibaba.jstorm.transactional.TransactionTopologyBuilder; import java.util.HashMap; import java.util.Map; +import org.apache.beam.runners.jstorm.serialization.CollectionsSerializer; import org.apache.beam.runners.jstorm.serialization.ImmutableListSerializer; import org.apache.beam.runners.jstorm.serialization.ImmutableMapSerializer; import org.apache.beam.runners.jstorm.serialization.ImmutableSetSerializer; @@ -105,6 +106,7 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> { SdkRepackImmuSetSerializer.registerSerializers(config); ImmutableMapSerializer.registerSerializers(config); SdkRepackImmutableMapSerializer.registerSerializers(config); + CollectionsSerializer.registerSerializers(config); config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class); return config; http://git-wip-us.apache.org/repos/asf/beam/blob/201ef722/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java new file mode 100644 index 0000000..0548196 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java @@ -0,0 +1,43 @@ +package org.apache.beam.runners.jstorm.serialization; + +import backtype.storm.Config; +import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; +import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; + +import java.util.Collections; +import java.util.List; + + +/** + * Specific serializer of {@link Kryo} for Collections. + */ +public class CollectionsSerializer { + + /** + * Specific {@link Kryo} serializer for {@link java.util.Collections.SingletonList}. + */ + public static class CollectionsSingletonListSerializer extends Serializer<List<?>> { + public CollectionsSingletonListSerializer() { + setImmutable(true); + } + + @Override + public List<?> read(final Kryo kryo, final Input input, final Class<List<?>> type) { + final Object obj = kryo.readClassAndObject(input); + return Collections.singletonList(obj); + } + + @Override + public void write(final Kryo kryo, final Output output, final List<?> list) { + kryo.writeClassAndObject(output, list.get(0)); + } + + } + + public static void registerSerializers(Config config) { + config.registerSerialization(Collections.singletonList("").getClass(), + CollectionsSingletonListSerializer.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/201ef722/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java index ce6ea2c..33393f2 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java @@ -240,7 +240,7 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { } public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) { - LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue()); + LOG.debug("ProcessExecutorElem: value={} from tag={}", elem.getValue(), inputTag); if (elem != null) { Executor executor = inputTagToExecutor.get(inputTag); if (executor != null) { http://git-wip-us.apache.org/repos/asf/beam/blob/201ef722/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java index 3b6b4d5..68a17e5 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java @@ -261,7 +261,6 @@ class JStormStateInternals<K> implements StateInternals { private final StateNamespace namespace; private final IKvStore<ComposedKey, T> kvState; private final IKvStore<ComposedKey, Object> stateInfoKvState; - private int elemIndex; JStormBagState( @Nullable K key, @@ -272,17 +271,19 @@ class JStormStateInternals<K> implements StateInternals { this.namespace = checkNotNull(namespace, "namespace"); this.kvState = checkNotNull(kvState, "kvState"); this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState"); + } - Integer index = (Integer) stateInfoKvState.get(getComposedKey()); - this.elemIndex = index != null ? ++index : 0; + private int getElementIndex() throws IOException { + Integer elementIndex = (Integer) stateInfoKvState.get(getComposedKey()); + return elementIndex != null ? elementIndex : 0; } @Override public void add(T input) { try { + int elemIndex = getElementIndex(); kvState.put(getComposedKey(elemIndex), input); - stateInfoKvState.put(getComposedKey(), elemIndex); - elemIndex++; + stateInfoKvState.put(getComposedKey(), ++elemIndex); } catch (IOException e) { throw new RuntimeException(e.getCause()); } @@ -293,7 +294,12 @@ class JStormStateInternals<K> implements StateInternals { return new ReadableState<Boolean>() { @Override public Boolean read() { - return elemIndex <= 0; + try { + return getElementIndex() <= 0; + } catch (IOException e) { + LOG.error("Failed to read", e); + return false; + } } @Override @@ -306,7 +312,7 @@ class JStormStateInternals<K> implements StateInternals { @Override public Iterable<T> read() { - return new BagStateIterable(elemIndex); + return new BagStateIterable(); } @Override @@ -318,11 +324,11 @@ class JStormStateInternals<K> implements StateInternals { @Override public void clear() { try { + int elemIndex = getElementIndex(); for (int i = 0; i < elemIndex; i++) { kvState.remove(getComposedKey(i)); } stateInfoKvState.remove(getComposedKey()); - elemIndex = 0; } catch (IOException e) { throw new RuntimeException(e.getCause()); } @@ -336,6 +342,18 @@ class JStormStateInternals<K> implements StateInternals { return ComposedKey.of(key, namespace, elemIndex); } + @Override + public String toString() { + int elemIndex = -1; + try { + elemIndex = getElementIndex(); + } catch (IOException e) { + + } + return String.format("JStormBagState: key=%s, namespace=%s, elementIndex=%d", + key, namespace, elemIndex); + } + /** * Implementation of Bag state Iterable. */ @@ -346,13 +364,11 @@ class JStormStateInternals<K> implements StateInternals { private int cursor = 0; BagStateIterator() { - Integer s = null; try { - s = (Integer) stateInfoKvState.get(getComposedKey()); + this.size = getElementIndex(); } catch (IOException e) { - LOG.error("Failed to get elemIndex for key={}", getComposedKey()); + throw new RuntimeException(e.getCause()); } - this.size = s != null ? ++s : 0; } @Override @@ -382,10 +398,8 @@ class JStormStateInternals<K> implements StateInternals { } } - private final int size; + BagStateIterable() { - BagStateIterable(int size) { - this.size = size; } @Override
