http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java index 315110d..a056937 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java @@ -50,11 +50,11 @@ import org.joda.time.Instant; public class TriggerStateMachineContextFactory<W extends BoundedWindow> { private final WindowFn<?, W> windowFn; - private StateInternals<?> stateInternals; + private StateInternals stateInternals; private final Coder<W> windowCoder; - public TriggerStateMachineContextFactory(WindowFn<?, W> windowFn, - StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) { + public TriggerStateMachineContextFactory( + WindowFn<?, W> windowFn, StateInternals stateInternals, ActiveWindowSet<W> activeWindows) { // Future triggers may be able to exploit the active window to state address window mapping. this.windowFn = windowFn; this.stateInternals = stateInternals; @@ -263,7 +263,7 @@ public class TriggerStateMachineContextFactory<W extends BoundedWindow> { } @Override - public <StateT extends State> StateT access(StateTag<? super Object, StateT> address) { + public <StateT extends State> StateT access(StateTag<StateT> address) { return stateInternals.state(windowNamespace, address); } } @@ -280,13 +280,13 @@ public class TriggerStateMachineContextFactory<W extends BoundedWindow> { @Override public <StateT extends State> StateT access( - StateTag<? super Object, StateT> address) { + StateTag<StateT> address) { return stateInternals.state(windowNamespace, address); } @Override public <StateT extends State> Map<W, StateT> accessInEachMergingWindow( - StateTag<? super Object, StateT> address) { + StateTag<StateT> address) { ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder(); for (W mergingWindow : activeToBeMerged) { StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address);
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java index e26241a..fc2f696 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java @@ -57,7 +57,7 @@ import org.joda.time.Instant; */ public class TriggerStateMachineRunner<W extends BoundedWindow> { @VisibleForTesting - public static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG = + public static final StateTag<ValueState<BitSet>> FINISHED_BITS_TAG = StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of())); private final ExecutableTriggerStateMachine rootTrigger; http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index bc33366..054a2e2 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -570,7 +570,7 @@ public class GroupAlsoByWindowsProperties { } private static final class CachingStateInternalsFactory<K> implements StateInternalsFactory<K> { - private final LoadingCache<K, StateInternals<K>> stateInternalsCache; + private final LoadingCache<K, StateInternals> stateInternalsCache; private CachingStateInternalsFactory() { this.stateInternalsCache = CacheBuilder.newBuilder().build(new StateInternalsLoader<K>()); @@ -578,7 +578,7 @@ public class GroupAlsoByWindowsProperties { @Override @SuppressWarnings("unchecked") - public StateInternals<K> stateInternalsForKey(K key) { + public StateInternals stateInternalsForKey(K key) { try { return stateInternalsCache.get(key); } catch (Exception exc) { @@ -587,9 +587,9 @@ public class GroupAlsoByWindowsProperties { } } - private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals<K>> { + private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals> { @Override - public StateInternals<K> load(K key) throws Exception { + public StateInternals load(K key) throws Exception { return InMemoryStateInternals.forKey(key); } } @@ -686,7 +686,7 @@ public class GroupAlsoByWindowsProperties { } @Override - public StateInternals<?> stateInternals() { + public StateInternals stateInternals() { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java index 6248401..16f7f26 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java @@ -60,22 +60,22 @@ public class InMemoryStateInternalsTest { private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = + private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<Object, CombiningState<Integer, int[], Integer>> + private static final StateTag<CombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); - private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = + private static final StateTag<BagState<String>> STRING_BAG_ADDR = StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<Object, SetState<String>> STRING_SET_ADDR = + private static final StateTag<SetState<String>> STRING_SET_ADDR = StateTags.set("stringSet", StringUtf8Coder.of()); - private static final StateTag<Object, MapState<String, Integer>> STRING_MAP_ADDR = + private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR = StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of()); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR = + private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR = StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR = + private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR = StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR = + private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR = StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey"); http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java index 95d6977..7a83a18 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java @@ -45,7 +45,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class MergingActiveWindowSetTest { private Sessions windowFn; - private StateInternals<String> state; + private StateInternals state; private MergingActiveWindowSet<IntervalWindow> set; private ActiveWindowSet.MergeCallback<IntervalWindow> callback; http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index eba0f67..573855f 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -314,14 +314,14 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { public final void assertHasOnlyGlobalAndFinishedSetsFor(W... expectedWindows) { assertHasOnlyGlobalAndAllowedTags( ImmutableSet.copyOf(expectedWindows), - ImmutableSet.<StateTag<? super String, ?>>of(TriggerStateMachineRunner.FINISHED_BITS_TAG)); + ImmutableSet.<StateTag<?>>of(TriggerStateMachineRunner.FINISHED_BITS_TAG)); } @SafeVarargs public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W... expectedWindows) { assertHasOnlyGlobalAndAllowedTags( ImmutableSet.copyOf(expectedWindows), - ImmutableSet.<StateTag<? super String, ?>>of( + ImmutableSet.<StateTag<?>>of( TriggerStateMachineRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG, WatermarkHold.watermarkHoldTagForTimestampCombiner( @@ -331,14 +331,14 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { public final void assertHasOnlyGlobalState() { assertHasOnlyGlobalAndAllowedTags( - Collections.<W>emptySet(), Collections.<StateTag<? super String, ?>>emptySet()); + Collections.<W>emptySet(), Collections.<StateTag<?>>emptySet()); } @SafeVarargs public final void assertHasOnlyGlobalAndPaneInfoFor(W... expectedWindows) { assertHasOnlyGlobalAndAllowedTags( ImmutableSet.copyOf(expectedWindows), - ImmutableSet.<StateTag<? super String, ?>>of( + ImmutableSet.<StateTag<?>>of( PaneInfoTracker.PANE_INFO_TAG, WatermarkHold.watermarkHoldTagForTimestampCombiner( objectStrategy.getTimestampCombiner()), @@ -350,30 +350,30 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { * {@code expectedWindows} and that each of these windows has only tags from {@code allowedTags}. */ private void assertHasOnlyGlobalAndAllowedTags( - Set<W> expectedWindows, Set<StateTag<? super String, ?>> allowedTags) { + Set<W> expectedWindows, Set<StateTag<?>> allowedTags) { Set<StateNamespace> expectedWindowsSet = new HashSet<>(); for (W expectedWindow : expectedWindows) { expectedWindowsSet.add(windowNamespace(expectedWindow)); } - Map<StateNamespace, Set<StateTag<? super String, ?>>> actualWindows = new HashMap<>(); + Map<StateNamespace, Set<StateTag<?>>> actualWindows = new HashMap<>(); for (StateNamespace namespace : stateInternals.getNamespacesInUse()) { if (namespace instanceof StateNamespaces.GlobalNamespace) { continue; } else if (namespace instanceof StateNamespaces.WindowNamespace) { - Set<StateTag<? super String, ?>> tagsInUse = stateInternals.getTagsInUse(namespace); + Set<StateTag<?>> tagsInUse = stateInternals.getTagsInUse(namespace); if (tagsInUse.isEmpty()) { continue; } actualWindows.put(namespace, tagsInUse); - Set<StateTag<? super String, ?>> unexpected = Sets.difference(tagsInUse, allowedTags); + Set<StateTag<?>> unexpected = Sets.difference(tagsInUse, allowedTags); if (unexpected.isEmpty()) { continue; } else { fail(namespace + " has unexpected states: " + tagsInUse); } } else if (namespace instanceof StateNamespaces.WindowAndTriggerNamespace) { - Set<StateTag<? super String, ?>> tagsInUse = stateInternals.getTagsInUse(namespace); + Set<StateTag<?>> tagsInUse = stateInternals.getTagsInUse(namespace); assertTrue(namespace + " contains " + tagsInUse, tagsInUse.isEmpty()); } else { fail("Unrecognized namespace " + namespace); http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index 1a44453..a67db6d 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -231,7 +231,7 @@ public class SplittableParDoTest { processFn.setStateInternalsFactory( new StateInternalsFactory<String>() { @Override - public StateInternals<String> stateInternalsForKey(String key) { + public StateInternals stateInternalsForKey(String key) { return stateInternals; } }); http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java index 9a8b75c..fc08dcc 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java @@ -41,10 +41,10 @@ import org.junit.runners.JUnit4; public class StateTagTest { @Test public void testValueEquality() { - StateTag<?, ?> fooVarInt1 = StateTags.value("foo", VarIntCoder.of()); - StateTag<?, ?> fooVarInt2 = StateTags.value("foo", VarIntCoder.of()); - StateTag<?, ?> fooBigEndian = StateTags.value("foo", BigEndianIntegerCoder.of()); - StateTag<?, ?> barVarInt = StateTags.value("bar", VarIntCoder.of()); + StateTag<?> fooVarInt1 = StateTags.value("foo", VarIntCoder.of()); + StateTag<?> fooVarInt2 = StateTags.value("foo", VarIntCoder.of()); + StateTag<?> fooBigEndian = StateTags.value("foo", BigEndianIntegerCoder.of()); + StateTag<?> barVarInt = StateTags.value("bar", VarIntCoder.of()); assertEquals(fooVarInt1, fooVarInt2); assertNotEquals(fooVarInt1, fooBigEndian); @@ -53,10 +53,10 @@ public class StateTagTest { @Test public void testBagEquality() { - StateTag<?, ?> fooVarInt1 = StateTags.bag("foo", VarIntCoder.of()); - StateTag<?, ?> fooVarInt2 = StateTags.bag("foo", VarIntCoder.of()); - StateTag<?, ?> fooBigEndian = StateTags.bag("foo", BigEndianIntegerCoder.of()); - StateTag<?, ?> barVarInt = StateTags.bag("bar", VarIntCoder.of()); + StateTag<?> fooVarInt1 = StateTags.bag("foo", VarIntCoder.of()); + StateTag<?> fooVarInt2 = StateTags.bag("foo", VarIntCoder.of()); + StateTag<?> fooBigEndian = StateTags.bag("foo", BigEndianIntegerCoder.of()); + StateTag<?> barVarInt = StateTags.bag("bar", VarIntCoder.of()); assertEquals(fooVarInt1, fooVarInt2); assertNotEquals(fooVarInt1, fooBigEndian); @@ -65,10 +65,10 @@ public class StateTagTest { @Test public void testSetEquality() { - StateTag<?, ?> fooVarInt1 = StateTags.set("foo", VarIntCoder.of()); - StateTag<?, ?> fooVarInt2 = StateTags.set("foo", VarIntCoder.of()); - StateTag<?, ?> fooBigEndian = StateTags.set("foo", BigEndianIntegerCoder.of()); - StateTag<?, ?> barVarInt = StateTags.set("bar", VarIntCoder.of()); + StateTag<?> fooVarInt1 = StateTags.set("foo", VarIntCoder.of()); + StateTag<?> fooVarInt2 = StateTags.set("foo", VarIntCoder.of()); + StateTag<?> fooBigEndian = StateTags.set("foo", BigEndianIntegerCoder.of()); + StateTag<?> barVarInt = StateTags.set("bar", VarIntCoder.of()); assertEquals(fooVarInt1, fooVarInt2); assertNotEquals(fooVarInt1, fooBigEndian); @@ -77,15 +77,15 @@ public class StateTagTest { @Test public void testMapEquality() { - StateTag<?, ?> fooStringVarInt1 = + StateTag<?> fooStringVarInt1 = StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of()); - StateTag<?, ?> fooStringVarInt2 = + StateTag<?> fooStringVarInt2 = StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of()); - StateTag<?, ?> fooStringBigEndian = + StateTag<?> fooStringBigEndian = StateTags.map("foo", StringUtf8Coder.of(), BigEndianIntegerCoder.of()); - StateTag<?, ?> fooVarIntBigEndian = + StateTag<?> fooVarIntBigEndian = StateTags.map("foo", VarIntCoder.of(), BigEndianIntegerCoder.of()); - StateTag<?, ?> barStringVarInt = + StateTag<?> barStringVarInt = StateTags.map("bar", StringUtf8Coder.of(), VarIntCoder.of()); assertEquals(fooStringVarInt1, fooStringVarInt2); @@ -97,11 +97,11 @@ public class StateTagTest { @Test public void testWatermarkBagEquality() { - StateTag<?, ?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); - StateTag<?, ?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); - StateTag<?, ?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST); + StateTag<?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); + StateTag<?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); + StateTag<?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST); - StateTag<?, ?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST); + StateTag<?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST); // Same id, same fn. assertEquals(foo1, foo2); @@ -119,12 +119,12 @@ public class StateTagTest { Coder<Integer> input2 = BigEndianIntegerCoder.of(); Combine.BinaryCombineIntegerFn minFn = Min.ofIntegers(); - StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn); - StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn); - StateTag<?, ?> fooCoder1Min = StateTags.combiningValueFromInputInternal("foo", input1, minFn); + StateTag<?> fooCoder1Max1 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn); + StateTag<?> fooCoder1Max2 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn); + StateTag<?> fooCoder1Min = StateTags.combiningValueFromInputInternal("foo", input1, minFn); - StateTag<?, ?> fooCoder2Max = StateTags.combiningValueFromInputInternal("foo", input2, maxFn); - StateTag<?, ?> barCoder1Max = StateTags.combiningValueFromInputInternal("bar", input1, maxFn); + StateTag<?> fooCoder2Max = StateTags.combiningValueFromInputInternal("foo", input2, maxFn); + StateTag<?> barCoder1Max = StateTags.combiningValueFromInputInternal("bar", input1, maxFn); // Same name, coder and combineFn assertEquals(fooCoder1Max1, fooCoder1Max2); @@ -162,16 +162,16 @@ public class StateTagTest { Coder<int[]> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of()); Coder<int[]> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of()); - StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueWithContext( + StateTag<?> fooCoder1Max1 = StateTags.combiningValueWithContext( "foo", accum1, CombineFnUtil.toFnWithContext(maxFn)); - StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueWithContext( + StateTag<?> fooCoder1Max2 = StateTags.combiningValueWithContext( "foo", accum1, CombineFnUtil.toFnWithContext(maxFn)); - StateTag<?, ?> fooCoder1Min = StateTags.combiningValueWithContext( + StateTag<?> fooCoder1Min = StateTags.combiningValueWithContext( "foo", accum1, CombineFnUtil.toFnWithContext(minFn)); - StateTag<?, ?> fooCoder2Max = StateTags.combiningValueWithContext( + StateTag<?> fooCoder2Max = StateTags.combiningValueWithContext( "foo", accum2, CombineFnUtil.toFnWithContext(maxFn)); - StateTag<?, ?> barCoder1Max = StateTags.combiningValueWithContext( + StateTag<?> barCoder1Max = StateTags.combiningValueWithContext( "bar", accum1, CombineFnUtil.toFnWithContext(maxFn)); // Same name, coder and combineFn http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java index aeaa63b..f80643a 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java @@ -131,7 +131,7 @@ public class StatefulDoFnRunnerTest { timerInternals.advanceInputWatermark(new Instant(1L)); MyDoFn fn = new MyDoFn(); - StateTag<Object, ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId, fn.intState); + StateTag<ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId, fn.intState); DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner( fn, @@ -227,7 +227,7 @@ public class StatefulDoFnRunnerTest { public final String stateId = "foo"; @StateId(stateId) - public final StateSpec<Object, ValueState<Integer>> intState = + public final StateSpec<ValueState<Integer>> intState = StateSpecs.value(VarIntCoder.of()); @ProcessElement http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java index 92d87b5..ef3a053 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java @@ -60,24 +60,25 @@ import org.joda.time.Instant; * of {@link InMemoryState}. Whenever state that exists in the underlying {@link StateTable} is * accessed, an independent copy will be created within this table. */ -public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> { - private final K key; - private final CopyOnAccessInMemoryStateTable<K> table; +public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals { + private final CopyOnAccessInMemoryStateTable table; + + private K key; /** * Creates a new {@link CopyOnAccessInMemoryStateInternals} with the underlying (possibly null) * StateInternals. */ - public static <K> CopyOnAccessInMemoryStateInternals<K> withUnderlying( - K key, @Nullable CopyOnAccessInMemoryStateInternals<K> underlying) { - return new CopyOnAccessInMemoryStateInternals<K>(key, underlying); + public static <K> CopyOnAccessInMemoryStateInternals withUnderlying( + K key, @Nullable CopyOnAccessInMemoryStateInternals underlying) { + return new CopyOnAccessInMemoryStateInternals<>(key, underlying); } private CopyOnAccessInMemoryStateInternals( - K key, CopyOnAccessInMemoryStateInternals<K> underlying) { + K key, CopyOnAccessInMemoryStateInternals underlying) { this.key = key; table = - new CopyOnAccessInMemoryStateTable<K>(key, underlying == null ? null : underlying.table); + new CopyOnAccessInMemoryStateTable(underlying == null ? null : underlying.table); } /** @@ -94,7 +95,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> * * @return this table */ - public CopyOnAccessInMemoryStateInternals<K> commit() { + public CopyOnAccessInMemoryStateInternals commit() { table.commit(); return this; } @@ -116,18 +117,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> } @Override - public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) { + public <T extends State> T state(StateNamespace namespace, StateTag<T> address) { return state(namespace, address, StateContexts.nullContext()); } @Override public <T extends State> T state( - StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) { + StateNamespace namespace, StateTag<T> address, StateContext<?> c) { return table.get(namespace, address, c); } @Override - public K getKey() { + public Object getKey() { return key; } @@ -140,9 +141,8 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> * {@link StateTable#get(StateNamespace, StateTag, StateContext)}, first attempts to obtain a * copy of existing {@link State} from an underlying {@link StateTable}. */ - private static class CopyOnAccessInMemoryStateTable<K> extends StateTable<K> { - private final K key; - private Optional<StateTable<K>> underlying; + private static class CopyOnAccessInMemoryStateTable extends StateTable { + private Optional<StateTable> underlying; /** * The StateBinderFactory currently in use by this {@link CopyOnAccessInMemoryStateTable}. @@ -162,17 +162,16 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> * when a {@link StateTag} is bound.</li> * </ul> */ - private StateBinderFactory<K> binderFactory; + private StateBinderFactory binderFactory; /** * The earliest watermark hold in this table. */ private Optional<Instant> earliestWatermarkHold; - public CopyOnAccessInMemoryStateTable(K key, StateTable<K> underlying) { - this.key = key; + public CopyOnAccessInMemoryStateTable(StateTable underlying) { this.underlying = Optional.fromNullable(underlying); - binderFactory = new CopyOnBindBinderFactory<>(key, this.underlying); + binderFactory = new CopyOnBindBinderFactory(this.underlying); earliestWatermarkHold = Optional.absent(); } @@ -191,7 +190,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> private void commit() { Instant earliestHold = getEarliestWatermarkHold(); if (underlying.isPresent()) { - ReadThroughBinderFactory<K> readThroughBinder = + ReadThroughBinderFactory readThroughBinder = new ReadThroughBinderFactory<>(underlying.get()); binderFactory = readThroughBinder; Instant earliestUnderlyingHold = readThroughBinder.readThroughAndGetEarliestHold(this); @@ -201,7 +200,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> } earliestWatermarkHold = Optional.of(earliestHold); clearEmpty(); - binderFactory = new InMemoryStateBinderFactory<>(key); + binderFactory = new InMemoryStateBinderFactory(); underlying = Optional.absent(); } @@ -246,37 +245,35 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> } @Override - protected StateBinder<K> binderForNamespace(final StateNamespace namespace, StateContext<?> c) { + protected StateBinder binderForNamespace(final StateNamespace namespace, StateContext<?> c) { return binderFactory.forNamespace(namespace, c); } - private interface StateBinderFactory<K> { - StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c); + private interface StateBinderFactory { + StateBinder forNamespace(StateNamespace namespace, StateContext<?> c); } /** * {@link StateBinderFactory} that creates a copy of any existing state when the state is bound. */ - private static class CopyOnBindBinderFactory<K> implements StateBinderFactory<K> { - private final K key; - private final Optional<StateTable<K>> underlying; + private static class CopyOnBindBinderFactory implements StateBinderFactory { + private final Optional<StateTable> underlying; - public CopyOnBindBinderFactory(K key, Optional<StateTable<K>> underlying) { - this.key = key; + public CopyOnBindBinderFactory(Optional<StateTable> underlying) { this.underlying = underlying; } - private boolean containedInUnderlying(StateNamespace namespace, StateTag<? super K, ?> tag) { + private boolean containedInUnderlying(StateNamespace namespace, StateTag<?> tag) { return underlying.isPresent() && underlying.get().isNamespaceInUse(namespace) && underlying.get().getTagsInUse(namespace).containsKey(tag); } @Override - public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) { - return new StateBinder<K>() { + public StateBinder forNamespace(final StateNamespace namespace, final StateContext<?> c) { + return new StateBinder() { @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, + public WatermarkHoldState bindWatermark( + StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) { if (containedInUnderlying(namespace, address)) { @SuppressWarnings("unchecked") @@ -291,7 +288,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> @Override public <T> ValueState<T> bindValue( - StateTag<? super K, ValueState<T>> address, Coder<T> coder) { + StateTag<ValueState<T>> address, Coder<T> coder) { if (containedInUnderlying(namespace, address)) { @SuppressWarnings("unchecked") InMemoryState<? extends ValueState<T>> existingState = @@ -306,7 +303,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { if (containedInUnderlying(namespace, address)) { @@ -322,7 +319,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> @Override public <T> BagState<T> bindBag( - StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { + StateTag<BagState<T>> address, Coder<T> elemCoder) { if (containedInUnderlying(namespace, address)) { @SuppressWarnings("unchecked") InMemoryState<? extends BagState<T>> existingState = @@ -336,7 +333,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> @Override public <T> SetState<T> bindSet( - StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) { + StateTag<SetState<T>> address, Coder<T> elemCoder) { if (containedInUnderlying(namespace, address)) { @SuppressWarnings("unchecked") InMemoryState<? extends SetState<T>> existingState = @@ -350,7 +347,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> @Override public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - StateTag<? super K, MapState<KeyT, ValueT>> address, + StateTag<MapState<KeyT, ValueT>> address, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { if (containedInUnderlying(namespace, address)) { @@ -367,7 +364,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); @@ -381,17 +378,17 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> * to {@link CopyOnAccessInMemoryStateTable#commit()} to read all values from * the underlying table. */ - private static class ReadThroughBinderFactory<K> implements StateBinderFactory<K> { - private final StateTable<K> underlying; + private static class ReadThroughBinderFactory<K> implements StateBinderFactory { + private final StateTable underlying; - public ReadThroughBinderFactory(StateTable<K> underlying) { + public ReadThroughBinderFactory(StateTable underlying) { this.underlying = underlying; } - public Instant readThroughAndGetEarliestHold(StateTable<K> readTo) { + public Instant readThroughAndGetEarliestHold(StateTable readTo) { Instant earliestHold = BoundedWindow.TIMESTAMP_MAX_VALUE; for (StateNamespace namespace : underlying.getNamespacesInUse()) { - for (Map.Entry<StateTag<? super K, ?>, ? extends State> existingState : + for (Map.Entry<StateTag<?>, ? extends State> existingState : underlying.getTagsInUse(namespace).entrySet()) { if (!((InMemoryState<?>) existingState.getValue()).isCleared()) { // Only read through non-cleared values to ensure that completed windows are @@ -412,44 +409,44 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> } @Override - public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) { - return new StateBinder<K>() { + public StateBinder forNamespace(final StateNamespace namespace, final StateContext<?> c) { + return new StateBinder() { @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, + public WatermarkHoldState bindWatermark( + StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) { return underlying.get(namespace, address, c); } @Override public <T> ValueState<T> bindValue( - StateTag<? super K, ValueState<T>> address, Coder<T> coder) { + StateTag<ValueState<T>> address, Coder<T> coder) { return underlying.get(namespace, address, c); } @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { return underlying.get(namespace, address, c); } @Override public <T> BagState<T> bindBag( - StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { + StateTag<BagState<T>> address, Coder<T> elemCoder) { return underlying.get(namespace, address, c); } @Override public <T> SetState<T> bindSet( - StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) { + StateTag<SetState<T>> address, Coder<T> elemCoder) { return underlying.get(namespace, address, c); } @Override public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - StateTag<? super K, MapState<KeyT, ValueT>> address, + StateTag<MapState<KeyT, ValueT>> address, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { return underlying.get(namespace, address, c); } @@ -457,7 +454,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { return bindCombiningValue( @@ -467,16 +464,13 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> } } - private static class InMemoryStateBinderFactory<K> implements StateBinderFactory<K> { - private final K key; + private static class InMemoryStateBinderFactory implements StateBinderFactory { - public InMemoryStateBinderFactory(K key) { - this.key = key; - } + public InMemoryStateBinderFactory() {} @Override - public StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c) { - return new InMemoryStateBinder<>(key, c); + public StateBinder forNamespace(StateNamespace namespace, StateContext<?> c) { + return new InMemoryStateBinder(c); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 1108f0d..107f39a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -34,11 +34,14 @@ class DirectExecutionContext extends BaseExecutionContext<DirectStepContext> { private final Clock clock; private final StructuralKey<?> key; - private final CopyOnAccessInMemoryStateInternals<Object> existingState; + private final CopyOnAccessInMemoryStateInternals existingState; private final TransformWatermarks watermarks; - public DirectExecutionContext(Clock clock, StructuralKey<?> key, - CopyOnAccessInMemoryStateInternals<Object> existingState, TransformWatermarks watermarks) { + public DirectExecutionContext( + Clock clock, + StructuralKey<?> key, + CopyOnAccessInMemoryStateInternals existingState, + TransformWatermarks watermarks) { this.clock = clock; this.key = key; this.existingState = existingState; @@ -55,7 +58,7 @@ class DirectExecutionContext */ public class DirectStepContext extends BaseExecutionContext.StepContext { - private CopyOnAccessInMemoryStateInternals<Object> stateInternals; + private CopyOnAccessInMemoryStateInternals<?> stateInternals; private DirectTimerInternals timerInternals; public DirectStepContext( @@ -64,7 +67,7 @@ class DirectExecutionContext } @Override - public CopyOnAccessInMemoryStateInternals<Object> stateInternals() { + public CopyOnAccessInMemoryStateInternals<?> stateInternals() { if (stateInternals == null) { stateInternals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, existingState); } @@ -84,7 +87,7 @@ class DirectExecutionContext * Commits the state of this step, and returns the committed state. If the step has not * accessed any state, return null. */ - public CopyOnAccessInMemoryStateInternals<?> commitState() { + public CopyOnAccessInMemoryStateInternals commitState() { if (stateInternals != null) { return stateInternals.commit(); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index 54ce027..f6d9a36 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -90,7 +90,7 @@ class EvaluationContext { private final WatermarkCallbackExecutor callbackExecutor; /** The stateInternals of the world, by applied PTransform and key. */ - private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals<?>> + private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals> applicationStateInternals; private final SideInputContainer sideInputContainer; @@ -179,9 +179,9 @@ class EvaluationContext { result.getAggregatorChanges().commit(); } // Update state internals - CopyOnAccessInMemoryStateInternals<?> theirState = result.getState(); + CopyOnAccessInMemoryStateInternals theirState = result.getState(); if (theirState != null) { - CopyOnAccessInMemoryStateInternals<?> committedState = theirState.commit(); + CopyOnAccessInMemoryStateInternals committedState = theirState.commit(); StepAndKey stepAndKey = StepAndKey.of( result.getTransform(), completedBundle == null ? null : completedBundle.getKey()); @@ -331,7 +331,7 @@ class EvaluationContext { return new DirectExecutionContext( clock, key, - (CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey), + (CopyOnAccessInMemoryStateInternals) applicationStateInternals.get(stepAndKey), watermarkManager.getWatermarks(application)); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index f1e29c6..9f567a4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -164,8 +164,8 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { (PCollection<KV<K, Iterable<V>>>) Iterables.getOnlyElement(application.getOutputs().values())); outputBundles.add(bundle); - CopyOnAccessInMemoryStateInternals<K> stateInternals = - (CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals(); + CopyOnAccessInMemoryStateInternals stateInternals = + (CopyOnAccessInMemoryStateInternals) stepContext.stateInternals(); DirectTimerInternals timerInternals = stepContext.timerInternals(); ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = new ReduceFnRunner<>( @@ -191,7 +191,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { @Override public TransformResult<KeyedWorkItem<K, V>> finishBundle() throws Exception { // State is initialized within the constructor. It can never be null. - CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState(); + CopyOnAccessInMemoryStateInternals state = stepContext.commitState(); return StepTransformResult.<KeyedWorkItem<K, V>>withHold( application, state.getEarliestWatermarkHold()) .withState(state) http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index cab11db..053da31 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -211,7 +211,7 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { throw UserCodeException.wrap(e); } StepTransformResult.Builder<InputT> resultBuilder; - CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState(); + CopyOnAccessInMemoryStateInternals state = stepContext.commitState(); if (state != null) { resultBuilder = StepTransformResult.<InputT>withHold(transform, state.getEarliestWatermarkHold()) http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 7efdb52..e0adc40 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -127,7 +127,7 @@ class SplittableProcessElementsEvaluatorFactory< new StateInternalsFactory<String>() { @SuppressWarnings({"unchecked", "rawtypes"}) @Override - public StateInternals<String> stateInternalsForKey(String key) { + public StateInternals stateInternalsForKey(String key) { return (StateInternals) stepContext.stateInternals(); } }); http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index 8793ae8..93ab077 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -175,10 +175,11 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo @Override public void run() { for (StateDeclaration stateDecl : signature.stateDeclarations().values()) { - StateTag<Object, ?> tag; + StateTag<?> tag; try { tag = - StateTags.tagForSpec(stateDecl.id(), (StateSpec) stateDecl.field().get(doFn)); + StateTags.tagForSpec( + stateDecl.id(), (StateSpec) stateDecl.field().get(doFn)); } catch (IllegalAccessException e) { throw new RuntimeException( String.format( http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java index 01b2a72..fe3ae97 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java @@ -70,7 +70,7 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder; private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElementsBuilder; private MetricUpdates metricUpdates; - private CopyOnAccessInMemoryStateInternals<?> state; + private CopyOnAccessInMemoryStateInternals state; private TimerUpdate timerUpdate; private AggregatorContainer.Mutator aggregatorChanges; private final Set<OutputType> producedOutputs; @@ -109,7 +109,7 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp return this; } - public Builder<InputT> withState(CopyOnAccessInMemoryStateInternals<?> state) { + public Builder<InputT> withState(CopyOnAccessInMemoryStateInternals state) { this.state = state; return this; } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java index 8bb5f93..bde44ca 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java @@ -85,7 +85,7 @@ public interface TransformResult<InputT> { * <p>If this evaluation did not access state, this may return null. */ @Nullable - CopyOnAccessInMemoryStateInternals<?> getState(); + CopyOnAccessInMemoryStateInternals getState(); /** * Returns a TimerUpdateBuilder that was produced as a result of this evaluation. If the http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java index 4d04745..3e29a69 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java @@ -72,7 +72,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); BagState<String> stringBag = internals.state(namespace, bagTag); assertThat(stringBag.read(), emptyIterable()); @@ -92,7 +92,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); BagState<String> stringBag = internals.state(namespace, bagTag); assertThat(stringBag.read(), emptyIterable()); @@ -114,18 +114,18 @@ public class CopyOnAccessInMemoryStateInternalsTest { */ @Test public void testGetWithPresentInUnderlying() { - CopyOnAccessInMemoryStateInternals<String> underlying = + CopyOnAccessInMemoryStateInternals<String>underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, ValueState<String>> valueTag = StateTags.value("foo", StringUtf8Coder.of()); + StateTag<ValueState<String>> valueTag = StateTags.value("foo", StringUtf8Coder.of()); ValueState<String> underlyingValue = underlying.state(namespace, valueTag); assertThat(underlyingValue.read(), nullValue(String.class)); underlyingValue.write("bar"); assertThat(underlyingValue.read(), equalTo("bar")); - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); ValueState<String> copyOnAccessState = internals.state(namespace, valueTag); assertThat(copyOnAccessState.read(), equalTo("bar")); @@ -140,18 +140,18 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testBagStateWithUnderlying() { - CopyOnAccessInMemoryStateInternals<String> underlying = + CopyOnAccessInMemoryStateInternals<String>underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, BagState<Integer>> valueTag = StateTags.bag("foo", VarIntCoder.of()); + StateTag<BagState<Integer>> valueTag = StateTags.bag("foo", VarIntCoder.of()); BagState<Integer> underlyingValue = underlying.state(namespace, valueTag); assertThat(underlyingValue.read(), emptyIterable()); underlyingValue.add(1); assertThat(underlyingValue.read(), containsInAnyOrder(1)); - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); BagState<Integer> copyOnAccessState = internals.state(namespace, valueTag); assertThat(copyOnAccessState.read(), containsInAnyOrder(1)); @@ -166,18 +166,18 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testSetStateWithUnderlying() { - CopyOnAccessInMemoryStateInternals<String> underlying = + CopyOnAccessInMemoryStateInternals<String>underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, SetState<Integer>> valueTag = StateTags.set("foo", VarIntCoder.of()); + StateTag<SetState<Integer>> valueTag = StateTags.set("foo", VarIntCoder.of()); SetState<Integer> underlyingValue = underlying.state(namespace, valueTag); assertThat(underlyingValue.read(), emptyIterable()); underlyingValue.add(1); assertThat(underlyingValue.read(), containsInAnyOrder(1)); - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); SetState<Integer> copyOnAccessState = internals.state(namespace, valueTag); assertThat(copyOnAccessState.read(), containsInAnyOrder(1)); @@ -192,11 +192,11 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testMapStateWithUnderlying() { - CopyOnAccessInMemoryStateInternals<String> underlying = + CopyOnAccessInMemoryStateInternals<String>underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, MapState<String, Integer>> valueTag = + StateTag<MapState<String, Integer>> valueTag = StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of()); MapState<String, Integer> underlyingValue = underlying.state(namespace, valueTag); assertThat(underlyingValue.entries().read(), emptyIterable()); @@ -204,7 +204,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { underlyingValue.put("hello", 1); assertThat(underlyingValue.get("hello").read(), equalTo(1)); - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); MapState<String, Integer> copyOnAccessState = internals.state(namespace, valueTag); assertThat(copyOnAccessState.get("hello").read(), equalTo(1)); @@ -221,13 +221,13 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testAccumulatorCombiningStateWithUnderlying() throws CannotProvideCoderException { - CopyOnAccessInMemoryStateInternals<String> underlying = + CopyOnAccessInMemoryStateInternals<String>underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); CombineFn<Long, long[], Long> sumLongFn = Sum.ofLongs(); StateNamespace namespace = new StateNamespaceForTest("foo"); CoderRegistry reg = pipeline.getCoderRegistry(); - StateTag<Object, CombiningState<Long, long[], Long>> stateTag = + StateTag<CombiningState<Long, long[], Long>> stateTag = StateTags.combiningValue("summer", sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn); GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag); @@ -236,7 +236,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { underlyingValue.add(1L); assertThat(underlyingValue.read(), equalTo(1L)); - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag); assertThat(copyOnAccessState.read(), equalTo(1L)); @@ -251,13 +251,13 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testWatermarkHoldStateWithUnderlying() { - CopyOnAccessInMemoryStateInternals<String> underlying = + CopyOnAccessInMemoryStateInternals<String>underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST; StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, WatermarkHoldState> stateTag = + StateTag<WatermarkHoldState> stateTag = StateTags.watermarkStateInternal("wmstate", timestampCombiner); WatermarkHoldState underlyingValue = underlying.state(namespace, stateTag); assertThat(underlyingValue.read(), nullValue()); @@ -265,7 +265,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { underlyingValue.add(new Instant(250L)); assertThat(underlyingValue.read(), equalTo(new Instant(250L))); - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); WatermarkHoldState copyOnAccessState = internals.state(namespace, stateTag); assertThat(copyOnAccessState.read(), equalTo(new Instant(250L))); @@ -284,10 +284,10 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithoutUnderlying() { - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); BagState<String> stringBag = internals.state(namespace, bagTag); assertThat(stringBag.read(), emptyIterable()); @@ -304,13 +304,13 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithUnderlying() { - CopyOnAccessInMemoryStateInternals<String> underlying = + CopyOnAccessInMemoryStateInternals<String>underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); BagState<String> stringBag = underlying.state(namespace, bagTag); assertThat(stringBag.read(), emptyIterable()); @@ -331,15 +331,15 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithClearedInUnderlying() { - CopyOnAccessInMemoryStateInternals<String> underlying = + CopyOnAccessInMemoryStateInternals<String>underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); - CopyOnAccessInMemoryStateInternals<String> secondUnderlying = + CopyOnAccessInMemoryStateInternals<String>secondUnderlying = spy(CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying)); - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, secondUnderlying); StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); BagState<String> stringBag = underlying.state(namespace, bagTag); assertThat(stringBag.read(), emptyIterable()); @@ -361,13 +361,13 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithOverwrittenUnderlying() { - CopyOnAccessInMemoryStateInternals<String> underlying = + CopyOnAccessInMemoryStateInternals<String>underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); BagState<String> stringBag = underlying.state(namespace, bagTag); assertThat(stringBag.read(), emptyIterable()); @@ -392,15 +392,15 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithAddedUnderlying() { - CopyOnAccessInMemoryStateInternals<String> underlying = + CopyOnAccessInMemoryStateInternals<String>underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); internals.commit(); StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); BagState<String> stringBag = underlying.state(namespace, bagTag); assertThat(stringBag.read(), emptyIterable()); @@ -416,7 +416,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithEmptyTableIsEmpty() { - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); internals.commit(); @@ -426,11 +426,11 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithOnlyClearedValuesIsEmpty() { - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); BagState<String> stringBag = internals.state(namespace, bagTag); assertThat(stringBag.read(), emptyIterable()); @@ -444,13 +444,13 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testCommitWithEmptyNewAndFullUnderlyingIsNotEmpty() { - CopyOnAccessInMemoryStateInternals<String> underlying = + CopyOnAccessInMemoryStateInternals<String>underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); BagState<String> stringBag = underlying.state(namespace, bagTag); assertThat(stringBag.read(), emptyIterable()); @@ -475,16 +475,16 @@ public class CopyOnAccessInMemoryStateInternalsTest { return new Instant(689743L); } }; - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null); - StateTag<Object, WatermarkHoldState> firstHoldAddress = + StateTag<WatermarkHoldState> firstHoldAddress = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); WatermarkHoldState firstHold = internals.state(StateNamespaces.window(null, first), firstHoldAddress); firstHold.add(new Instant(22L)); - StateTag<Object, WatermarkHoldState> secondHoldAddress = + StateTag<WatermarkHoldState> secondHoldAddress = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); WatermarkHoldState secondHold = internals.state(StateNamespaces.window(null, second), secondHoldAddress); @@ -508,18 +508,18 @@ public class CopyOnAccessInMemoryStateInternalsTest { return new Instant(689743L); } }; - CopyOnAccessInMemoryStateInternals<String> underlying = + CopyOnAccessInMemoryStateInternals<String>underlying = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null); - StateTag<Object, WatermarkHoldState> firstHoldAddress = + StateTag<WatermarkHoldState> firstHoldAddress = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); WatermarkHoldState firstHold = underlying.state(StateNamespaces.window(null, first), firstHoldAddress); firstHold.add(new Instant(22L)); - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit()); - StateTag<Object, WatermarkHoldState> secondHoldAddress = + StateTag<WatermarkHoldState> secondHoldAddress = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); WatermarkHoldState secondHold = internals.state(StateNamespaces.window(null, second), secondHoldAddress); @@ -545,18 +545,18 @@ public class CopyOnAccessInMemoryStateInternalsTest { return new Instant(689743L); } }; - CopyOnAccessInMemoryStateInternals<String> underlying = + CopyOnAccessInMemoryStateInternals<String>underlying = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null); - StateTag<Object, WatermarkHoldState> firstHoldAddress = + StateTag<WatermarkHoldState> firstHoldAddress = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); WatermarkHoldState firstHold = underlying.state(StateNamespaces.window(null, first), firstHoldAddress); firstHold.add(new Instant(224L)); - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit()); - StateTag<Object, WatermarkHoldState> secondHoldAddress = + StateTag<WatermarkHoldState> secondHoldAddress = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); WatermarkHoldState secondHold = internals.state(StateNamespaces.window(null, second), secondHoldAddress); @@ -568,7 +568,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { @Test public void testGetEarliestHoldBeforeCommit() { - CopyOnAccessInMemoryStateInternals<String> internals = + CopyOnAccessInMemoryStateInternals<String>internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); internals http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 51ae12a..6f9adc4 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -524,6 +524,7 @@ public class DirectRunnerTest implements Serializable { } private static class LongNoDecodeCoder extends CustomCoder<Long> { + @Override public void encode( Long value, OutputStream outStream, Context context) throws IOException { http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 0c3a8ed..bfbcd79 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -161,7 +161,7 @@ public class EvaluationContextTest { context.getExecutionContext(createdProducer, StructuralKey.of("foo", StringUtf8Coder.of())); - StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); + StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); DirectStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1"); stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1); @@ -194,7 +194,7 @@ public class EvaluationContextTest { context.getExecutionContext(createdProducer, StructuralKey.of("foo", StringUtf8Coder.of())); - StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); + StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); fooContext .getOrCreateStepContext("s1", "s1") @@ -221,7 +221,7 @@ public class EvaluationContextTest { DirectExecutionContext fooContext = context.getExecutionContext(createdProducer, myKey); - StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); + StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); fooContext .getOrCreateStepContext("s1", "s1") @@ -246,9 +246,9 @@ public class EvaluationContextTest { DirectExecutionContext fooContext = context.getExecutionContext(downstreamProducer, myKey); - StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); + StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); - CopyOnAccessInMemoryStateInternals<Object> state = + CopyOnAccessInMemoryStateInternals<?> state = fooContext.getOrCreateStepContext("s1", "s1").stateInternals(); BagState<Integer> bag = state.state(StateNamespaces.global(), intBag); bag.add(1); @@ -268,7 +268,7 @@ public class EvaluationContextTest { DirectExecutionContext afterResultContext = context.getExecutionContext(downstreamProducer, myKey); - CopyOnAccessInMemoryStateInternals<Object> afterResultState = + CopyOnAccessInMemoryStateInternals<?> afterResultState = afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals(); assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4)); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index ecb8130..fc63406 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -92,7 +92,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { @Mock private transient UncommittedBundle<Integer> mockUncommittedBundle; private static final String KEY = "any-key"; - private transient StateInternals<Object> stateInternals = + private transient StateInternals stateInternals = CopyOnAccessInMemoryStateInternals.<Object>withUnderlying(KEY, null); private static final BundleFactory BUNDLE_FACTORY = ImmutableListBundleFactory.create(); @@ -104,7 +104,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { @Before public void setup() { MockitoAnnotations.initMocks(this); - when((StateInternals<Object>) mockStepContext.stateInternals()).thenReturn(stateInternals); + when((StateInternals) mockStepContext.stateInternals()).thenReturn(stateInternals); when(mockEvaluationContext.createSideInputReader(anyList())) .thenReturn( SideInputContainer.create( @@ -133,7 +133,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { ParDo.of( new DoFn<KV<String, Integer>, Integer>() { @StateId(stateId) - private final StateSpec<Object, ValueState<String>> spec = + private final StateSpec<ValueState<String>> spec = StateSpecs.value(StringUtf8Coder.of()); @ProcessElement @@ -165,7 +165,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { StateNamespaces.window(IntervalWindow.getCoder(), firstWindow); StateNamespace secondWindowNamespace = StateNamespaces.window(IntervalWindow.getCoder(), secondWindow); - StateTag<Object, ValueState<String>> tag = + StateTag<ValueState<String>> tag = StateTags.tagForSpec(stateId, StateSpecs.value(StringUtf8Coder.of())); // Set up non-empty state. We don't mock + verify calls to clear() but instead @@ -247,7 +247,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { .of( new DoFn<KV<String, Integer>, Integer>() { @StateId(stateId) - private final StateSpec<Object, ValueState<String>> spec = + private final StateSpec<ValueState<String>> spec = StateSpecs.value(StringUtf8Coder.of()); @ProcessElement http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java index 847a00a..8640801 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java @@ -61,7 +61,7 @@ public class FlinkNoOpStepContext implements StepContext { } @Override - public StateInternals<?> stateInternals() { + public StateInternals stateInternals() { return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java index 879fad7..a79f856 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -123,7 +123,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT> Collections.<TupleTag<?>>emptyList(), new FlinkNoOpStepContext() { @Override - public StateInternals<?> stateInternals() { + public StateInternals stateInternals() { return stateInternals; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 01830de..d8fd79a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -133,7 +133,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> protected transient long currentOutputWatermark; - private transient StateTag<Object, BagState<WindowedValue<InputT>>> pushedBackTag; + private transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag; protected transient FlinkStateInternals<?> stateInternals; @@ -149,7 +149,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> protected transient FlinkTimerInternals timerInternals; - private transient StateInternals<?> pushbackStateInternals; + private transient StateInternals pushbackStateInternals; private transient Optional<Long> pushedBackWatermark; @@ -673,7 +673,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> } @Override - public StateInternals<?> stateInternals() { + public StateInternals stateInternals() { return stateInternals; } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index fb6762d..1887a99 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -94,10 +94,10 @@ public class SplittableDoFnOperator< StateInternalsFactory<String> stateInternalsFactory = new StateInternalsFactory<String>() { @Override - public StateInternals<String> stateInternalsForKey(String key) { + public StateInternals stateInternalsForKey(String key) { //this will implicitly be keyed by the key of the incoming // element or by the key of a firing timer - return (StateInternals<String>) stateInternals; + return (StateInternals) stateInternals; } }; TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() { http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 9718734..3899303 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -83,10 +83,10 @@ public class WindowDoFnOperator<K, InputT, OutputT> protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() { StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() { @Override - public StateInternals<K> stateInternalsForKey(K key) { + public StateInternals stateInternalsForKey(K key) { //this will implicitly be keyed by the key of the incoming // element or by the key of a firing timer - return (StateInternals<K>) stateInternals; + return (StateInternals) stateInternals; } }; TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() {