Repository: beam Updated Branches: refs/heads/master 80d2548f2 -> 836e8e4aa
BEAM-1053 ApexGroupByKeyOperator serialization issues Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/74e31c35 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/74e31c35 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/74e31c35 Branch: refs/heads/master Commit: 74e31c350986d093be1a0b53d001b3376def8b69 Parents: 80d2548 Author: Thomas Weise <t...@apache.org> Authored: Sat Apr 8 13:01:01 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Mon Apr 10 10:55:31 2017 -0700 ---------------------------------------------------------------------- .../operators/ApexGroupByKeyOperator.java | 26 ++++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/74e31c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 4551c9c..230082e 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -25,12 +25,12 @@ import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Operator; import com.datatorrent.api.StreamCodec; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.netlet.util.Slice; import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.base.Throwables; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -96,7 +96,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { private final SerializablePipelineOptions serializedOptions; @Bind(JavaSerializer.class) private final StateInternalsFactory<K> stateInternalsFactory; - private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); + private Map<Slice, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); private transient ProcessContext context; private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn; @@ -177,18 +177,18 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { * We keep these timers in a Set, so that they are deduplicated, as the same * timer can be registered multiple times. */ - private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess( + private Multimap<Slice, TimerInternals.TimerData> getTimersReadyToProcess( long currentWatermark) { // we keep the timers to return in a different list and launch them later // because we cannot prevent a trigger from registering another trigger, // which would lead to concurrent modification exception. - Multimap<ByteBuffer, TimerInternals.TimerData> toFire = HashMultimap.create(); + Multimap<Slice, TimerInternals.TimerData> toFire = HashMultimap.create(); - Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it = + Iterator<Map.Entry<Slice, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator(); while (it.hasNext()) { - Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>> keyWithTimers = it.next(); + Map.Entry<Slice, Set<TimerInternals.TimerData>> keyWithTimers = it.next(); Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator(); while (timerIt.hasNext()) { @@ -226,9 +226,9 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { } private void registerActiveTimer(K key, TimerInternals.TimerData timer) { - final ByteBuffer keyBytes; + final Slice keyBytes; try { - keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key)); + keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key)); } catch (CoderException e) { throw new RuntimeException(e); } @@ -241,9 +241,9 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { } private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) { - final ByteBuffer keyBytes; + final Slice keyBytes; try { - keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key)); + keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key)); } catch (CoderException e) { throw new RuntimeException(e); } @@ -260,11 +260,11 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception { this.inputWatermark = new Instant(mark.getTimestamp()); - Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess( + Multimap<Slice, TimerInternals.TimerData> timers = getTimersReadyToProcess( mark.getTimestamp()); if (!timers.isEmpty()) { - for (ByteBuffer keyBytes : timers.keySet()) { - K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array()); + for (Slice keyBytes : timers.keySet()) { + K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.buffer); KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(keyBytes)); context.setElement(kwi, getStateInternalsForKey(kwi.key())); fn.processElement(context);