http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java new file mode 100644 index 0000000..aa7d325 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java @@ -0,0 +1,92 @@ +package org.apache.beam.runners.jstorm.serialization; + +import backtype.storm.Config; +import org.apache.beam.runners.jstorm.util.RunnerUtils; +import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; +import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; +import com.google.common.collect.*; + +public class ImmutableListSerializer extends Serializer<ImmutableList<Object>> { + + private static final boolean DOES_NOT_ACCEPT_NULL = false; + private static final boolean IMMUTABLE = true; + + public ImmutableListSerializer() { + super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); + } + + @Override + public void write(Kryo kryo, Output output, ImmutableList<Object> object) { + output.writeInt(object.size(), true); + for (Object elm : object) { + kryo.writeClassAndObject(output, elm); + } + } + + @Override + public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) { + final int size = input.readInt(true); + final Object[] list = new Object[size]; + for (int i = 0; i < size; ++i) { + list[i] = kryo.readClassAndObject(input); + } + return ImmutableList.copyOf(list); + } + + /** + * Creates a new {@link ImmutableListSerializer} and registers its serializer + * for the several ImmutableList related classes. + */ + public static void registerSerializers(Config config) { + + // ImmutableList (abstract class) + // +- RegularImmutableList + // | RegularImmutableList + // +- SingletonImmutableList + // | Optimized for List with only 1 element. + // +- SubList + // | Representation for part of ImmutableList + // +- ReverseImmutableList + // | For iterating in reverse order + // +- StringAsImmutableList + // | Used by Lists#charactersOf + // +- Values (ImmutableTable values) + // Used by return value of #values() when there are multiple cells + + config.registerSerialization(ImmutableList.class, ImmutableListSerializer.class); + config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(ImmutableList.class), ImmutableListSerializer.class); + + // Note: + // Only registering above is good enough for serializing/deserializing. + // but if using Kryo#copy, following is required. + + config.registerSerialization(ImmutableList.of().getClass(), ImmutableListSerializer.class); + config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().getClass()), ImmutableListSerializer.class); + config.registerSerialization(ImmutableList.of(1).getClass(), ImmutableListSerializer.class); + config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1).getClass()), ImmutableListSerializer.class); + config.registerSerialization(ImmutableList.of(1,2,3).subList(1, 2).getClass(), ImmutableListSerializer.class); + config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1,2,3).subList(1, 2).getClass()), ImmutableListSerializer.class); + config.registerSerialization(ImmutableList.of().reverse().getClass(), ImmutableListSerializer.class); + config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().reverse().getClass()), ImmutableListSerializer.class); + + config.registerSerialization(Lists.charactersOf("KryoRocks").getClass(), ImmutableListSerializer.class); + config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(Lists.charactersOf("KryoRocks").getClass()), ImmutableListSerializer.class); + + Table<Integer,Integer,Integer> baseTable = HashBasedTable.create(); + baseTable.put(1, 2, 3); + baseTable.put(4, 5, 6); + Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable); + config.registerSerialization(table.values().getClass(), ImmutableListSerializer.class); + config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(table.values().getClass()), ImmutableListSerializer.class); + + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java new file mode 100644 index 0000000..ee8b765 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java @@ -0,0 +1,61 @@ +package org.apache.beam.runners.jstorm.serialization; + +import backtype.storm.Config; +import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; +import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + +import java.util.EnumMap; +import java.util.HashMap; +import java.util.Map; + +public class ImmutableMapSerializer extends Serializer<ImmutableMap<Object, ? extends Object>> { + + private static final boolean DOES_NOT_ACCEPT_NULL = true; + private static final boolean IMMUTABLE = true; + + public ImmutableMapSerializer() { + super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); + } + + @Override + public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) { + kryo.writeObject(output, Maps.newHashMap(immutableMap)); + } + + @Override + public ImmutableMap<Object, Object> read(Kryo kryo, Input input, Class<ImmutableMap<Object, ? extends Object>> type) { + Map map = kryo.readObject(input, HashMap.class); + return ImmutableMap.copyOf(map); + } + + /** + * Creates a new {@link ImmutableMapSerializer} and registers its serializer + * for the several ImmutableMap related classes. + */ + public static void registerSerializers(Config config) { + + config.registerSerialization(ImmutableMap.class, ImmutableMapSerializer.class); + config.registerSerialization(ImmutableMap.of().getClass(), ImmutableMapSerializer.class); + + Object o1 = new Object(); + Object o2 = new Object(); + + config.registerSerialization(ImmutableMap.of(o1, o1).getClass(), ImmutableMapSerializer.class); + config.registerSerialization(ImmutableMap.of(o1, o1, o2, o2).getClass(), ImmutableMapSerializer.class); + Map<DummyEnum,Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class); + for (DummyEnum e : DummyEnum.values()) { + enumMap.put(e, o1); + } + + config.registerSerialization(ImmutableMap.copyOf(enumMap).getClass(), ImmutableMapSerializer.class); + } + + private enum DummyEnum { + VALUE1, + VALUE2 + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java new file mode 100644 index 0000000..cdc4382 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java @@ -0,0 +1,71 @@ +package org.apache.beam.runners.jstorm.serialization; + +import backtype.storm.Config; +import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; +import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class ImmutableSetSerializer extends Serializer<ImmutableSet<Object>> { + + private static final boolean DOES_NOT_ACCEPT_NULL = false; + private static final boolean IMMUTABLE = true; + + public ImmutableSetSerializer() { + super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); + } + + @Override + public void write(Kryo kryo, Output output, ImmutableSet<Object> object) { + output.writeInt(object.size(), true); + for (Object elm : object) { + kryo.writeClassAndObject(output, elm); + } + } + + @Override + public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) { + final int size = input.readInt(true); + ImmutableSet.Builder<Object> builder = ImmutableSet.builder(); + for (int i = 0; i < size; ++i) { + builder.add(kryo.readClassAndObject(input)); + } + return builder.build(); + } + + /** + * Creates a new {@link ImmutableSetSerializer} and registers its serializer + * for the several ImmutableSet related classes. + */ + public static void registerSerializers(Config config) { + + // ImmutableList (abstract class) + // +- EmptyImmutableSet + // | EmptyImmutableSet + // +- SingletonImmutableSet + // | Optimized for Set with only 1 element. + // +- RegularImmutableSet + // | RegularImmutableList + // +- EnumImmutableSet + // | EnumImmutableSet + + config.registerSerialization(ImmutableSet.class, ImmutableSetSerializer.class); + + // Note: + // Only registering above is good enough for serializing/deserializing. + // but if using Kryo#copy, following is required. + + config.registerSerialization(ImmutableSet.of().getClass(), ImmutableSetSerializer.class); + config.registerSerialization(ImmutableSet.of(1).getClass(), ImmutableSetSerializer.class); + config.registerSerialization(ImmutableSet.of(1,2,3).getClass(), ImmutableSetSerializer.class); + + config.registerSerialization( + Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), ImmutableSetSerializer.class); + } + + private enum SomeEnum { + A, B, C + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java new file mode 100644 index 0000000..decfb3f --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java @@ -0,0 +1,55 @@ +package org.apache.beam.runners.jstorm.serialization; + +import com.alibaba.jstorm.cache.KvStoreIterable; +import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; +import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; +import com.google.common.collect.Lists; + +import java.util.Iterator; +import java.util.List; + +public class KvStoreIterableSerializer extends Serializer<KvStoreIterable<Object>> { + + public KvStoreIterableSerializer() { + + } + + @Override + public void write(Kryo kryo, Output output, KvStoreIterable<Object> object) { + List<Object> values = Lists.newArrayList(object); + output.writeInt(values.size(), true); + for (Object elm : object) { + kryo.writeClassAndObject(output, elm); + } + } + + @Override + public KvStoreIterable<Object> read(Kryo kryo, Input input, Class<KvStoreIterable<Object>> type) { + final int size = input.readInt(true); + List<Object> values = Lists.newArrayList(); + for (int i = 0; i < size; ++i) { + values.add(kryo.readClassAndObject(input)); + } + + return new KvStoreIterable<Object>() { + Iterable<Object> values; + + @Override + public Iterator<Object> iterator() { + return values.iterator(); + } + + public KvStoreIterable init(Iterable<Object> values) { + this.values = values; + return this; + } + + @Override + public String toString() { + return values.toString(); + } + }.init(values); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java new file mode 100644 index 0000000..9bb315b --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java @@ -0,0 +1,78 @@ +package org.apache.beam.runners.jstorm.serialization; + + +import backtype.storm.Config; +import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; +import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; +import org.apache.beam.sdk.repackaged.com.google.common.collect.*; + +public class SdkRepackImmuListSerializer extends Serializer<ImmutableList<Object>> { + + private static final boolean DOES_NOT_ACCEPT_NULL = false; + private static final boolean IMMUTABLE = true; + + public SdkRepackImmuListSerializer() { + super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); + } + + @Override + public void write(Kryo kryo, Output output, ImmutableList<Object> object) { + output.writeInt(object.size(), true); + for (Object elm : object) { + kryo.writeClassAndObject(output, elm); + } + } + + @Override + public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) { + final int size = input.readInt(true); + final Object[] list = new Object[size]; + for (int i = 0; i < size; ++i) { + list[i] = kryo.readClassAndObject(input); + } + return ImmutableList.copyOf(list); + } + + /** + * Creates a new {@link ImmutableListSerializer} and registers its serializer + * for the several ImmutableList related classes. + */ + public static void registerSerializers(Config config) { + + // ImmutableList (abstract class) + // +- RegularImmutableList + // | RegularImmutableList + // +- SingletonImmutableList + // | Optimized for List with only 1 element. + // +- SubList + // | Representation for part of ImmutableList + // +- ReverseImmutableList + // | For iterating in reverse order + // +- StringAsImmutableList + // | Used by Lists#charactersOf + // +- Values (ImmutableTable values) + // Used by return value of #values() when there are multiple cells + + config.registerSerialization(ImmutableList.class, SdkRepackImmuListSerializer.class); + + // Note: + // Only registering above is good enough for serializing/deserializing. + // but if using Kryo#copy, following is required. + + config.registerSerialization(ImmutableList.of().getClass(), SdkRepackImmuListSerializer.class); + config.registerSerialization(ImmutableList.of(1).getClass(), SdkRepackImmuListSerializer.class); + config.registerSerialization(ImmutableList.of(1,2,3).subList(1, 2).getClass(), SdkRepackImmuListSerializer.class); + config.registerSerialization(ImmutableList.of().reverse().getClass(), SdkRepackImmuListSerializer.class); + + config.registerSerialization(Lists.charactersOf("KryoRocks").getClass(), SdkRepackImmuListSerializer.class); + + Table<Integer,Integer,Integer> baseTable = HashBasedTable.create(); + baseTable.put(1, 2, 3); + baseTable.put(4, 5, 6); + Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable); + config.registerSerialization(table.values().getClass(), SdkRepackImmuListSerializer.class); + + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java new file mode 100644 index 0000000..a514645 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java @@ -0,0 +1,71 @@ +package org.apache.beam.runners.jstorm.serialization; + +import backtype.storm.Config; +import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; +import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; +import org.apache.beam.sdk.repackaged.com.google.common.collect.*; + +public class SdkRepackImmuSetSerializer extends Serializer<ImmutableSet<Object>> { + + private static final boolean DOES_NOT_ACCEPT_NULL = false; + private static final boolean IMMUTABLE = true; + + public SdkRepackImmuSetSerializer() { + super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); + } + + @Override + public void write(Kryo kryo, Output output, ImmutableSet<Object> object) { + output.writeInt(object.size(), true); + for (Object elm : object) { + kryo.writeClassAndObject(output, elm); + } + } + + @Override + public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) { + final int size = input.readInt(true); + ImmutableSet.Builder<Object> builder = ImmutableSet.builder(); + for (int i = 0; i < size; ++i) { + builder.add(kryo.readClassAndObject(input)); + } + return builder.build(); + } + + /** + * Creates a new {@link ImmutableSetSerializer} and registers its serializer + * for the several ImmutableSet related classes. + */ + public static void registerSerializers(Config config) { + + // ImmutableList (abstract class) + // +- EmptyImmutableSet + // | EmptyImmutableSet + // +- SingletonImmutableSet + // | Optimized for Set with only 1 element. + // +- RegularImmutableSet + // | RegularImmutableList + // +- EnumImmutableSet + // | EnumImmutableSet + + config.registerSerialization(ImmutableSet.class, SdkRepackImmuSetSerializer.class); + + // Note: + // Only registering above is good enough for serializing/deserializing. + // but if using Kryo#copy, following is required. + + config.registerSerialization(ImmutableSet.of().getClass(), SdkRepackImmuSetSerializer.class); + config.registerSerialization(ImmutableSet.of(1).getClass(), SdkRepackImmuSetSerializer.class); + config.registerSerialization(ImmutableSet.of(1,2,3).getClass(), SdkRepackImmuSetSerializer.class); + + config.registerSerialization( + Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), SdkRepackImmuSetSerializer.class); + } + + private enum SomeEnum { + A, B, C + } +} + http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java new file mode 100644 index 0000000..c8b0138 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java @@ -0,0 +1,159 @@ +package org.apache.beam.runners.jstorm.serialization; + +import backtype.storm.Config; +import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; +import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; + +import java.lang.reflect.Field; +import java.util.*; + +public class UnmodifiableCollectionsSerializer extends Serializer<Object> { + + private static final Field SOURCE_COLLECTION_FIELD; + private static final Field SOURCE_MAP_FIELD; + + static { + try { + SOURCE_COLLECTION_FIELD = Class.forName("java.util.Collections$UnmodifiableCollection" ) + .getDeclaredField( "c" ); + SOURCE_COLLECTION_FIELD.setAccessible( true ); + + + SOURCE_MAP_FIELD = Class.forName("java.util.Collections$UnmodifiableMap" ) + .getDeclaredField( "m" ); + SOURCE_MAP_FIELD.setAccessible( true ); + } catch ( final Exception e ) { + throw new RuntimeException( "Could not access source collection" + + " field in java.util.Collections$UnmodifiableCollection.", e ); + } + } + + @Override + public Object read(final Kryo kryo, final Input input, final Class<Object> clazz) { + final int ordinal = input.readInt( true ); + final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal]; + final Object sourceCollection = kryo.readClassAndObject( input ); + return unmodifiableCollection.create( sourceCollection ); + } + + @Override + public void write(final Kryo kryo, final Output output, final Object object) { + try { + final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( object.getClass() ); + // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id") + output.writeInt( unmodifiableCollection.ordinal(), true ); + kryo.writeClassAndObject( output, unmodifiableCollection.sourceCollectionField.get( object ) ); + } catch ( final RuntimeException e ) { + // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write... + // handles SerializationException specifically (resizing the buffer)... + throw e; + } catch ( final Exception e ) { + throw new RuntimeException( e ); + } + } + + @Override + public Object copy(Kryo kryo, Object original) { + try { + final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( original.getClass() ); + Object sourceCollectionCopy = kryo.copy(unmodifiableCollection.sourceCollectionField.get(original)); + return unmodifiableCollection.create( sourceCollectionCopy ); + } catch ( final RuntimeException e ) { + // Don't eat and wrap RuntimeExceptions + throw e; + } catch ( final Exception e ) { + throw new RuntimeException( e ); + } + } + + private static enum UnmodifiableCollection { + COLLECTION( Collections.unmodifiableCollection( Arrays.asList( "" ) ).getClass(), SOURCE_COLLECTION_FIELD ){ + @Override + public Object create( final Object sourceCollection ) { + return Collections.unmodifiableCollection( (Collection<?>) sourceCollection ); + } + }, + RANDOM_ACCESS_LIST( Collections.unmodifiableList( new ArrayList<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){ + @Override + public Object create( final Object sourceCollection ) { + return Collections.unmodifiableList( (List<?>) sourceCollection ); + } + }, + LIST( Collections.unmodifiableList( new LinkedList<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){ + @Override + public Object create( final Object sourceCollection ) { + return Collections.unmodifiableList( (List<?>) sourceCollection ); + } + }, + SET( Collections.unmodifiableSet( new HashSet<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){ + @Override + public Object create( final Object sourceCollection ) { + return Collections.unmodifiableSet( (Set<?>) sourceCollection ); + } + }, + SORTED_SET( Collections.unmodifiableSortedSet( new TreeSet<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){ + @Override + public Object create( final Object sourceCollection ) { + return Collections.unmodifiableSortedSet( (SortedSet<?>) sourceCollection ); + } + }, + MAP( Collections.unmodifiableMap( new HashMap<Void, Void>() ).getClass(), SOURCE_MAP_FIELD ) { + + @Override + public Object create( final Object sourceCollection ) { + return Collections.unmodifiableMap( (Map<?, ?>) sourceCollection ); + } + + }, + SORTED_MAP( Collections.unmodifiableSortedMap( new TreeMap<Void, Void>() ).getClass(), SOURCE_MAP_FIELD ) { + @Override + public Object create( final Object sourceCollection ) { + return Collections.unmodifiableSortedMap( (SortedMap<?, ?>) sourceCollection ); + } + }; + + private final Class<?> type; + private final Field sourceCollectionField; + + private UnmodifiableCollection( final Class<?> type, final Field sourceCollectionField ) { + this.type = type; + this.sourceCollectionField = sourceCollectionField; + } + + /** + * @param sourceCollection + */ + public abstract Object create( Object sourceCollection ); + + static UnmodifiableCollection valueOfType(final Class<?> type ) { + for( final UnmodifiableCollection item : values() ) { + if ( item.type.equals( type ) ) { + return item; + } + } + throw new IllegalArgumentException( "The type " + type + " is not supported." ); + } + + } + + /** + * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer + * for the several unmodifiable Collections that can be created via {@link Collections}, + * including {@link Map}s. + * + * @see Collections#unmodifiableCollection(Collection) + * @see Collections#unmodifiableList(List) + * @see Collections#unmodifiableSet(Set) + * @see Collections#unmodifiableSortedSet(SortedSet) + * @see Collections#unmodifiableMap(Map) + * @see Collections#unmodifiableSortedMap(SortedMap) + */ + public static void registerSerializers( Config config ) { + UnmodifiableCollection.values(); + for ( final UnmodifiableCollection item : UnmodifiableCollection.values() ) { + config.registerSerialization( item.type, UnmodifiableCollectionsSerializer.class ); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java new file mode 100644 index 0000000..d907fac --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.beam.runners.core.construction.PTransformMatchers; +import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.runners.PTransformOverride; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.values.PValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator; + +import java.util.List; + +/** + * Pipleline translator of Storm + */ +public class StormPipelineTranslator extends Pipeline.PipelineVisitor.Defaults { + private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class); + private TranslationContext context; + private int depth = 0; + + public StormPipelineTranslator(TranslationContext context) { + this.context = context; + } + + public void translate(Pipeline pipeline) { + List<PTransformOverride> transformOverrides = + ImmutableList.<PTransformOverride>builder() + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class), + new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class))) + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class), + new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class))) + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class), + new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class))) + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class), + new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class))) + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class), + new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class))) + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), + new ReflectiveOneToOneOverrideFactory((ViewTranslator.CombineGloballyAsSingletonView.class)))) + .build(); + pipeline.replaceAll(transformOverrides); + pipeline.traverseTopologically(this); + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node); + this.depth++; + + // check if current composite transforms need to be translated. + // If not, all sub transforms will be translated in visitPrimitiveTransform. + PTransform<?, ?> transform = node.getTransform(); + if (transform != null) { + TransformTranslator translator = TranslatorRegistry.getTranslator(transform); + + if (translator != null && applyCanTranslate(transform, node, translator)) { + applyStreamingTransform(transform, node, translator); + LOG.info(genSpaces(this.depth) + "translated-" + node); + return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; + } + } + return CompositeBehavior.ENTER_TRANSFORM; + } + + public void leaveCompositeTransform(TransformHierarchy.Node node) { + this.depth--; + LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node); + } + + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node); + + if (!node.isRootNode()) { + PTransform<?, ?> transform = node.getTransform(); + TransformTranslator translator = TranslatorRegistry.getTranslator(transform); + if (translator == null || !applyCanTranslate(transform, node, translator)) { + LOG.info(node.getTransform().getClass().toString()); + throw new UnsupportedOperationException("The transform " + transform + " is currently not supported."); + } + applyStreamingTransform(transform, node, translator); + } + } + + public void visitValue(PValue value, TransformHierarchy.Node node) { + LOG.info(genSpaces(this.depth) + "visiting value {}", value); + } + + private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformHierarchy.Node node, + TransformTranslator<?> translator) { + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + @SuppressWarnings("unchecked") + TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator; + + context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform()); + typedTranslator.translateNode(typedTransform, context); + + // Maintain PValue to TupleTag map for side inputs translation. + context.getUserGraphContext().recordOutputTaggedPValue(); + } + + private <T extends PTransform<?, ?>> boolean applyCanTranslate(PTransform<?, ?> transform, TransformHierarchy.Node node, TransformTranslator<?> translator) { + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + @SuppressWarnings("unchecked") + TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator; + + context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform()); + + return typedTranslator.canTranslate(typedTransform, context); + } + + /** + * Utility formatting method. + * + * @param n number of spaces to generate + * @return String with "|" followed by n spaces + */ + protected static String genSpaces(int n) { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < n; i++) { + builder.append("| "); + } + return builder.toString(); + } + + private static class ReflectiveOneToOneOverrideFactory< + InputT extends PValue, + OutputT extends PValue, + TransformT extends PTransform<InputT, OutputT>> + extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> { + private final Class<PTransform<InputT, OutputT>> replacement; + + private ReflectiveOneToOneOverrideFactory( + Class<PTransform<InputT, OutputT>> replacement) { + this.replacement = replacement; + } + + @Override + public PTransformReplacement<InputT, OutputT> getReplacementTransform(AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) { + PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform(); + PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement) + .withArg((Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(), originalPTransform) + .build(); + InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values()); + return PTransformReplacement.of(inputT, replacedPTransform); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java new file mode 100644 index 0000000..bf4515f --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import avro.shaded.com.google.common.collect.Lists; +import org.apache.beam.runners.jstorm.translation.translator.Stream; +import org.apache.beam.runners.jstorm.util.RunnerUtils; +import com.google.common.base.Strings; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.beam.runners.jstorm.StormPipelineOptions; +import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout; +import org.apache.beam.runners.jstorm.translation.runtime.Executor; +import org.apache.beam.runners.jstorm.translation.util.CommonInstance; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.PValueBase; +import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt; + +import java.util.*; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * Maintains the state necessary during Pipeline translation to build a Storm topology. + */ +public class TranslationContext { + private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class); + + private final UserGraphContext userGraphContext; + private final ExecutionGraphContext executionGraphContext; + + public TranslationContext(StormPipelineOptions options) { + this.userGraphContext = new UserGraphContext(options); + this.executionGraphContext = new ExecutionGraphContext(); + } + + public ExecutionGraphContext getExecutionGraphContext() { + return executionGraphContext; + } + + public UserGraphContext getUserGraphContext() { + return userGraphContext; + } + + private void addStormStreamDef(TaggedPValue input, String destComponentName, Stream.Grouping grouping) { + Stream.Producer producer = executionGraphContext.getProducer(input.getValue()); + if (!producer.getComponentId().equals(destComponentName)) { + Stream.Consumer consumer = Stream.Consumer.of(destComponentName, grouping); + executionGraphContext.registerStreamConsumer(consumer, producer); + + ExecutorsBolt executorsBolt = executionGraphContext.getBolt(producer.getComponentId()); + if (executorsBolt != null) { + executorsBolt.addExternalOutputTag(input.getTag()); + } + } + } + + private String getUpstreamExecutorsBolt() { + for (PValue value : userGraphContext.getInputs().values()) { + String componentId = executionGraphContext.getProducerComponentId(value); + if (componentId != null && executionGraphContext.getBolt(componentId) != null) { + return componentId; + } + } + // When upstream component is spout, "null" will be return. + return null; + } + + /** + * check if the current transform is applied to source collection. + * @return + */ + private boolean connectedToSource() { + for (PValue value : userGraphContext.getInputs().values()) { + if (executionGraphContext.producedBySpout(value)) { + return true; + } + } + return false; + } + + /** + * @param upstreamExecutorsBolt + * @return true if there is multiple input streams, or upstream executor output the same stream + * to different executors + */ + private boolean isMultipleInputOrOutput(ExecutorsBolt upstreamExecutorsBolt, Map<TupleTag<?>, PValue> inputs) { + if (inputs.size() > 1) { + return true; + } else { + final Sets.SetView<TupleTag> intersection = Sets.intersection(upstreamExecutorsBolt.getExecutors().keySet(), inputs.keySet()); + if (!intersection.isEmpty()) { + // there is already a different executor consume the same input + return true; + } else { + return false; + } + } + } + + public void addTransformExecutor(Executor executor) { + addTransformExecutor(executor, Collections.EMPTY_LIST); + } + + public void addTransformExecutor(Executor executor, List<PValue> sideInputs) { + addTransformExecutor(executor, userGraphContext.getInputs(), userGraphContext.getOutputs(), sideInputs); + } + + public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs) { + addTransformExecutor(executor, inputs, outputs, Collections.EMPTY_LIST); + } + + public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs, List<PValue> sideInputs) { + String name = null; + + ExecutorsBolt bolt = null; + + boolean isGBK = false; + /** + * Check if the transform executor needs to be chained into an existing ExecutorsBolt. + * For following cases, a new bolt is created for the specified executor, otherwise the executor + * will be added into the bolt contains corresponding upstream executor. + * a) it is a GroupByKey executor + * b) it is connected to source directly + * c) None existing upstream bolt was found + * d) For the purpose of performance to reduce the side effects between multiple streams which + * is output to same executor, a new bolt will be created. + */ + if (RunnerUtils.isGroupByKeyExecutor(executor)) { + bolt = new ExecutorsBolt(); + name = executionGraphContext.registerBolt(bolt); + isGBK = true; + } else if (connectedToSource()) { + bolt = new ExecutorsBolt(); + name = executionGraphContext.registerBolt(bolt); + } else { + name = getUpstreamExecutorsBolt(); + if (name == null) { + bolt = new ExecutorsBolt(); + name = executionGraphContext.registerBolt(bolt); + } else { + bolt = executionGraphContext.getBolt(name); + if (isMultipleInputOrOutput(bolt, inputs)) { + bolt = new ExecutorsBolt(); + name = executionGraphContext.registerBolt(bolt); + } + } + } + + // update the output tags of current transform into ExecutorsBolt + for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) { + TupleTag tag = entry.getKey(); + PValue value = entry.getValue(); + + // use tag of PValueBase + if (value instanceof PValueBase) { + tag = ((PValueBase) value).expand().keySet().iterator().next(); + } + executionGraphContext.registerStreamProducer( + TaggedPValue.of(tag, value), + Stream.Producer.of(name, tag.getId(), value.getName())); + //bolt.addOutputTags(tag); + } + + // add the transform executor into the chain of ExecutorsBolt + for (Map.Entry<TupleTag<?>, PValue> entry : inputs.entrySet()) { + TupleTag tag = entry.getKey(); + PValue value = entry.getValue(); + bolt.addExecutor(tag, executor); + + // filter all connections inside bolt + //if (!bolt.getOutputTags().contains(tag)) { + Stream.Grouping grouping; + if (isGBK) { + grouping = Stream.Grouping.byFields(Arrays.asList(CommonInstance.KEY)); + } else { + grouping = Stream.Grouping.of(Stream.Grouping.Type.LOCAL_OR_SHUFFLE); + } + addStormStreamDef(TaggedPValue.of(tag, value), name, grouping); + //} + } + + for (PValue sideInput : sideInputs) { + TupleTag tag = userGraphContext.findTupleTag(sideInput); + bolt.addExecutor(tag, executor); + checkState(!bolt.getOutputTags().contains(tag)); + addStormStreamDef(TaggedPValue.of(tag, sideInput), name, Stream.Grouping.of(Stream.Grouping.Type.ALL)); + } + + bolt.registerExecutor(executor); + + // set parallelismNumber + String pTransformfullName = userGraphContext.currentTransform.getFullName(); + String compositeName = pTransformfullName.split("/")[0]; + Map parallelismNumMap = userGraphContext.getOptions().getParallelismNumMap(); + if (parallelismNumMap.containsKey(compositeName)) { + int configNum = (Integer) parallelismNumMap.get(compositeName); + int currNum = bolt.getParallelismNum(); + bolt.setParallelismNum(Math.max(configNum, currNum)); + } + } + + // TODO: add getSideInputs() and getSideOutputs(). + public static class UserGraphContext { + private final StormPipelineOptions options; + private final Map<PValue, TupleTag> pValueToTupleTag; + private AppliedPTransform<?, ?, ?> currentTransform = null; + + private boolean isWindowed = false; + + public UserGraphContext(StormPipelineOptions options) { + this.options = checkNotNull(options, "options"); + this.pValueToTupleTag = Maps.newHashMap(); + } + + public StormPipelineOptions getOptions() { + return this.options; + } + + public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) { + this.currentTransform = transform; + } + + public String getStepName() { + return currentTransform.getFullName(); + } + + public <T extends PValue> T getInput() { + return (T) currentTransform.getInputs().values().iterator().next(); + } + + public Map<TupleTag<?>, PValue> getInputs() { + return currentTransform.getInputs(); + } + + public TupleTag<?> getInputTag() { + return currentTransform.getInputs().keySet().iterator().next(); + } + + public List<TupleTag<?>> getInputTags() { + return Lists.newArrayList(currentTransform.getInputs().keySet()); + } + + public <T extends PValue> T getOutput() { + return (T) currentTransform.getOutputs().values().iterator().next(); + } + + public Map<TupleTag<?>, PValue> getOutputs() { + return currentTransform.getOutputs(); + } + + public TupleTag<?> getOutputTag() { + return currentTransform.getOutputs().keySet().iterator().next(); + } + + public List<TupleTag<?>> getOutputTags() { + return Lists.newArrayList(currentTransform.getOutputs().keySet()); + } + + public void recordOutputTaggedPValue() { + for (Map.Entry<TupleTag<?>, PValue> entry : getOutputs().entrySet()) { + pValueToTupleTag.put(entry.getValue(), entry.getKey()); + } + } + + public <T> TupleTag<T> findTupleTag(PValue pValue) { + return pValueToTupleTag.get(checkNotNull(pValue, "pValue")); + } + + public void setWindowed() { + this.isWindowed = true; + } + + public boolean isWindowed() { + return this.isWindowed; + } + + @Override + public String toString() { + return Joiner.on('\n').join(FluentIterable.from(pValueToTupleTag.entrySet()) + .transform(new Function<Map.Entry<PValue,TupleTag>, String>() { + @Override + public String apply(Map.Entry<PValue, TupleTag> entry) { + return String.format("%s == %s", entry.getValue().getId(), entry.getKey().getName()); + }})); + } + } + + public static class ExecutionGraphContext { + + private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>(); + private final Map<String, ExecutorsBolt> boltMap = new HashMap<>(); + + // One-to-one mapping between Stream.Producer and TaggedPValue (or PValue). + private final Map<PValue, Stream.Producer> pValueToProducer = new HashMap<>(); + private final Map<Stream.Producer, TaggedPValue> producerToTaggedPValue = new HashMap<>(); + + private final List<Stream> streams = new ArrayList<>(); + + private int id = 1; + + public void registerSpout(AdaptorBasicSpout spout, TaggedPValue output) { + checkNotNull(spout, "spout"); + checkNotNull(output, "output"); + String name = "spout" + genId(); + this.spoutMap.put(name, spout); + registerStreamProducer( + output, + Stream.Producer.of(name, output.getTag().getId(), output.getValue().getName())); + } + + public AdaptorBasicSpout getSpout(String id) { + if (Strings.isNullOrEmpty(id)) { + return null; + } + return this.spoutMap.get(id); + } + + public Map<String, AdaptorBasicSpout> getSpouts() { + return this.spoutMap; + } + + public String registerBolt(ExecutorsBolt bolt) { + checkNotNull(bolt, "bolt"); + String name = "bolt" + genId(); + this.boltMap.put(name, bolt); + return name; + } + + public ExecutorsBolt getBolt(String id) { + if (Strings.isNullOrEmpty(id)) { + return null; + } + return this.boltMap.get(id); + } + + public void registerStreamProducer(TaggedPValue taggedPValue, Stream.Producer producer) { + checkNotNull(taggedPValue, "taggedPValue"); + checkNotNull(producer, "producer"); + pValueToProducer.put(taggedPValue.getValue(), producer); + producerToTaggedPValue.put(producer, taggedPValue); + } + + public Stream.Producer getProducer(PValue pValue) { + return pValueToProducer.get(checkNotNull(pValue, "pValue")); + } + + public String getProducerComponentId(PValue pValue) { + Stream.Producer producer = getProducer(pValue); + return producer == null ? null : producer.getComponentId(); + } + + public boolean producedBySpout(PValue pValue) { + String componentId = getProducerComponentId(pValue); + return getSpout(componentId) != null; + } + + public void registerStreamConsumer(Stream.Consumer consumer, Stream.Producer producer) { + streams.add(Stream.of( + checkNotNull(producer, "producer"), + checkNotNull(consumer, "consumer"))); + } + + public Map<PValue, Stream.Producer> getPValueToProducers() { + return pValueToProducer; + } + + public Iterable<Stream> getStreams() { + return streams; + } + + @Override + public String toString() { + List<String> ret = new ArrayList<>(); + ret.add("SPOUT"); + for (Map.Entry<String, AdaptorBasicSpout> entry : spoutMap.entrySet()) { + ret.add(entry.getKey() + ": " + entry.getValue().toString()); + } + ret.add("BOLT"); + for (Map.Entry<String, ExecutorsBolt> entry : boltMap.entrySet()) { + ret.add(entry.getKey() + ": " + entry.getValue().toString()); + } + ret.add("STREAM"); + for (Stream stream : streams) { + ret.add(String.format( + "%s@@%s ---> %s@@%s", + stream.getProducer().getStreamId(), + stream.getProducer().getComponentId(), + stream.getConsumer().getGrouping(), + stream.getConsumer().getComponentId())); + } + return Joiner.on("\n").join(ret); + } + + private synchronized int genId() { + return id++; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java new file mode 100644 index 0000000..0f856cf --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import com.alibaba.jstorm.beam.translation.translator.*; +import org.apache.beam.runners.jstorm.translation.translator.BoundedSourceTranslator; +import org.apache.beam.runners.jstorm.translation.translator.FlattenTranslator; +import org.apache.beam.runners.jstorm.translation.translator.GroupByKeyTranslator; +import org.apache.beam.runners.jstorm.translation.translator.ParDoBoundMultiTranslator; +import org.apache.beam.runners.jstorm.translation.translator.ParDoBoundTranslator; +import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator; +import org.apache.beam.runners.jstorm.translation.translator.UnboundedSourceTranslator; +import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator; +import org.apache.beam.runners.jstorm.translation.translator.WindowAssignTranslator; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Lookup table mapping PTransform types to associated TransformTranslator implementations. + */ +public class TranslatorRegistry { + private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class); + + private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS = new HashMap<>(); + + static { + TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator()); + TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator()); + // TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator()); + // TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); + + TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator()); + TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator()); + + //TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>()); + TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>()); + + TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator()); + + TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); + + TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator()); + + /** + * Currently, empty translation is required for combine and reshuffle. Because, the transforms will be + * mapped to GroupByKey and Pardo finally. So we only need to translator the finally transforms. + * If any improvement is required, the composite transforms will be translated in the future. + */ + // TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); + // TRANSLATORS.put(Globally.class, new CombineGloballyTranslator()); + // TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator()); + } + + public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) { + TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass()); + if (translator == null) { + LOG.warn("Unsupported operator={}", transform.getClass().getName()); + } + return translator; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java new file mode 100644 index 0000000..b07b426 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.beam.runners.jstorm.translation.util.CommonInstance; + +import backtype.storm.topology.IComponent; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; + +/* + * Enable user to add output stream definitions by API, rather than hard-code. + */ +public abstract class AbstractComponent implements IComponent { + private Map<String, Fields> streamToFields = new HashMap<>(); + private Map<String, Boolean> keyStreams = new HashMap<>(); + private int parallelismNum = 0; + + public void addOutputField(String streamId) { + addOutputField(streamId, new Fields(CommonInstance.VALUE)); + } + + public void addOutputField(String streamId, Fields fields) { + streamToFields.put(streamId, fields); + keyStreams.put(streamId, false); + } + + public void addKVOutputField(String streamId) { + streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE)); + keyStreams.put(streamId, true); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) { + declarer.declareStream(entry.getKey(), entry.getValue()); + } + } + + public boolean keyedEmit(String streamId) { + Boolean isKeyedStream = keyStreams.get(streamId); + return isKeyedStream == null ? false : isKeyedStream; + } + + public int getParallelismNum() { + return parallelismNum; + } + + public void setParallelismNum(int num) { + parallelismNum = num; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java new file mode 100644 index 0000000..91881f2 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import backtype.storm.topology.IRichBatchBolt; + +public abstract class AdaptorBasicBolt extends AbstractComponent implements IRichBatchBolt { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java new file mode 100644 index 0000000..5a0c6ec --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import backtype.storm.topology.IRichSpout; + +public abstract class AdaptorBasicSpout extends AbstractComponent implements IRichSpout { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java new file mode 100644 index 0000000..2bf3303 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import java.io.Serializable; +import java.util.*; + +import avro.shaded.com.google.common.collect.Iterables; +import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals; +import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals; + +import com.alibaba.jstorm.cache.IKvStoreManager; +import com.alibaba.jstorm.metric.MetricClient; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.SideInputHandler; +import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.WatermarkHoldState; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.beam.runners.jstorm.StormPipelineOptions; +import org.apache.beam.runners.jstorm.translation.util.DefaultStepContext; +import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +public class DoFnExecutor<InputT, OutputT> implements Executor { + private static final long serialVersionUID = 5297603063991078668L; + + private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class); + + public class DoFnExecutorOutputManager implements OutputManager, Serializable { + private static final long serialVersionUID = -661113364735206170L; + + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + executorsBolt.processExecutorElem(tag, output); + } + } + + protected transient DoFnRunner<InputT, OutputT> runner = null; + protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null; + + protected final String stepName; + + protected int internalDoFnExecutorId; + + protected final String description; + + protected final TupleTag<OutputT> mainTupleTag; + protected final List<TupleTag<?>> sideOutputTags; + + protected SerializedPipelineOptions serializedOptions; + protected transient StormPipelineOptions pipelineOptions; + + protected DoFn<InputT, OutputT> doFn; + protected final Coder<WindowedValue<InputT>> inputCoder; + protected DoFnInvoker<InputT, OutputT> doFnInvoker; + protected OutputManager outputManager; + protected WindowingStrategy<?, ?> windowingStrategy; + protected final TupleTag<InputT> mainInputTag; + protected Collection<PCollectionView<?>> sideInputs; + protected SideInputHandler sideInputHandler; + protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView; + + // Initialize during runtime + protected ExecutorContext executorContext; + protected ExecutorsBolt executorsBolt; + protected TimerInternals timerInternals; + protected transient StateInternals pushbackStateInternals; + protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag; + protected transient StateTag<WatermarkHoldState> watermarkHoldTag; + protected transient IKvStoreManager kvStoreManager; + protected DefaultStepContext stepContext; + protected transient MetricClient metricClient; + + public DoFnExecutor( + String stepName, + String description, + StormPipelineOptions pipelineOptions, + DoFn<InputT, OutputT> doFn, + Coder<WindowedValue<InputT>> inputCoder, + WindowingStrategy<?, ?> windowingStrategy, + TupleTag<InputT> mainInputTag, + Collection<PCollectionView<?>> sideInputs, + Map<TupleTag, PCollectionView<?>> sideInputTagToView, + TupleTag<OutputT> mainTupleTag, + List<TupleTag<?>> sideOutputTags) { + this.stepName = checkNotNull(stepName, "stepName"); + this.description = checkNotNull(description, "description"); + this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + this.doFn = doFn; + this.inputCoder = inputCoder; + this.outputManager = new DoFnExecutorOutputManager(); + this.windowingStrategy = windowingStrategy; + this.mainInputTag = mainInputTag; + this.sideInputs = sideInputs; + this.mainTupleTag = mainTupleTag; + this.sideOutputTags = sideOutputTags; + this.sideInputTagToView = sideInputTagToView; + } + + protected DoFnRunner<InputT, OutputT> getDoFnRunner() { + return new DoFnRunnerWithMetrics<>( + stepName, + DoFnRunners.simpleRunner( + this.pipelineOptions, + this.doFn, + this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler, + this.outputManager, + this.mainTupleTag, + this.sideOutputTags, + this.stepContext, + this.windowingStrategy), + MetricsReporter.create(metricClient)); + } + + protected void initService(ExecutorContext context) { + // TODO: what should be set for key in here? + timerInternals = new JStormTimerInternals(null /* key */, this, context.getExecutorsBolt().timerService()); + kvStoreManager = context.getKvStoreManager(); + stepContext = new DefaultStepContext(timerInternals, + new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); + metricClient = new MetricClient(executorContext.getTopologyContext()); + } + + @Override + public void init(ExecutorContext context) { + this.executorContext = context; + this.executorsBolt = context.getExecutorsBolt(); + this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class); + + initService(context); + + // Side inputs setup + if (sideInputs != null && sideInputs.isEmpty() == false) { + pushedBackTag = StateTags.bag("pushed-back-values", inputCoder); + watermarkHoldTag = + StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST); + pushbackStateInternals = new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId); + sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals); + runner = getDoFnRunner(); + pushbackRunner = SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler); + } else { + runner = getDoFnRunner(); + } + + // Process user's setup + doFnInvoker = DoFnInvokers.invokerFor(doFn); + doFnInvoker.invokeSetup(); + } + + @Override + public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { + LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}", + tag, mainInputTag, sideInputs, elem.getValue())); + if (mainInputTag.equals(tag)) { + processMainInput(elem); + } else { + processSideInput(tag, elem); + } + } + + protected <T> void processMainInput(WindowedValue<T> elem) { + if (sideInputs.isEmpty()) { + runner.processElement((WindowedValue<InputT>) elem); + } else { + Iterable<WindowedValue<InputT>> justPushedBack = + pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem); + BagState<WindowedValue<InputT>> pushedBack = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + + Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE; + for (WindowedValue<InputT> pushedBackValue : justPushedBack) { + if (pushedBackValue.getTimestamp().isBefore(min)) { + min = pushedBackValue.getTimestamp(); + } + min = earlier(min, pushedBackValue.getTimestamp()); + pushedBack.add(pushedBackValue); + } + pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min); + } + } + + protected void processSideInput(TupleTag tag, WindowedValue elem) { + LOG.debug(String.format("side inputs: %s, %s.", tag, elem)); + + PCollectionView<?> sideInputView = sideInputTagToView.get(tag); + sideInputHandler.addSideInputValue(sideInputView, elem); + + BagState<WindowedValue<InputT>> pushedBack = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + + List<WindowedValue<InputT>> newPushedBack = new ArrayList<>(); + + Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read(); + if (pushedBackInputs != null) { + for (WindowedValue<InputT> input : pushedBackInputs) { + + Iterable<WindowedValue<InputT>> justPushedBack = + pushbackRunner.processElementInReadyWindows(input); + Iterables.addAll(newPushedBack, justPushedBack); + } + } + pushedBack.clear(); + + Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE; + for (WindowedValue<InputT> pushedBackValue : newPushedBack) { + min = earlier(min, pushedBackValue.getTimestamp()); + pushedBack.add(pushedBackValue); + } + + WatermarkHoldState watermarkHold = + pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag); + // TODO: clear-then-add is not thread-safe. + watermarkHold.clear(); + watermarkHold.add(min); + } + + /** + * Process all pushed back elements when receiving watermark with max timestamp + */ + public void processAllPushBackElements() { + if (sideInputs != null && sideInputs.isEmpty() == false) { + BagState<WindowedValue<InputT>> pushedBackElements = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + if (pushedBackElements != null) { + for (WindowedValue<InputT> elem : pushedBackElements.read()) { + LOG.info("Process pushback elem={}", elem); + runner.processElement(elem); + } + pushedBackElements.clear(); + } + + WatermarkHoldState watermarkHold = + pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag); + watermarkHold.clear(); + watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE); + } + } + + public void onTimer(Object key, TimerInternals.TimerData timerData) { + StateNamespace namespace = timerData.getNamespace(); + checkArgument(namespace instanceof StateNamespaces.WindowNamespace); + BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow(); + if (pushbackRunner != null) { + pushbackRunner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); + } else { + runner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); + } + } + + @Override + public void cleanup() { + doFnInvoker.invokeTeardown(); + } + + @Override + public String toString() { + return description; + } + + private Instant earlier(Instant left, Instant right) { + return left.isBefore(right) ? left : right; + } + + public void startBundle() { + if (pushbackRunner != null) { + pushbackRunner.startBundle(); + } else { + runner.startBundle(); + } + } + + public void finishBundle() { + if (pushbackRunner != null) { + pushbackRunner.finishBundle(); + } else { + runner.finishBundle(); + } + } + + public void setInternalDoFnExecutorId(int id) { + this.internalDoFnExecutorId = id; + } + + public int getInternalDoFnExecutorId() { + return internalDoFnExecutorId; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java new file mode 100644 index 0000000..98dbcc5 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.joda.time.Instant; + +/** + * DoFnRunner decorator which registers {@link MetricsContainer}. + */ +public class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { + + private final String stepName; + private final DoFnRunner<InputT, OutputT> delegate; + private final MetricsReporter metricsReporter; + + DoFnRunnerWithMetrics( + String stepName, + DoFnRunner<InputT, OutputT> delegate, + MetricsReporter metricsReporter) { + this.stepName = checkNotNull(stepName, "stepName"); + this.delegate = checkNotNull(delegate, "delegate"); + this.metricsReporter = checkNotNull(metricsReporter, "metricsReporter"); + } + + @Override + public void startBundle() { + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( + metricsReporter.getMetricsContainer(stepName))) { + delegate.startBundle(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void processElement(WindowedValue<InputT> elem) { + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( + metricsReporter.getMetricsContainer(stepName))) { + delegate.processElement(elem); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) { + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( + metricsReporter.getMetricsContainer(stepName))) { + delegate.onTimer(timerId, window, timestamp, timeDomain); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void finishBundle() { + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( + metricsReporter.getMetricsContainer(stepName))) { + delegate.finishBundle(); + } catch (IOException e) { + throw new RuntimeException(e); + } + metricsReporter.updateMetrics(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java new file mode 100644 index 0000000..d7214db --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import java.io.Serializable; + +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +public interface Executor extends Serializable { + /** + * Initialization during runtime + */ + void init(ExecutorContext context); + + <T> void process(TupleTag<T> tag, WindowedValue<T> elem); + + void cleanup(); +} \ No newline at end of file
