http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java new file mode 100644 index 0000000..59c4d8e --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java @@ -0,0 +1,526 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers; +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; +import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext; +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.common.Counter; +import org.apache.beam.sdk.util.common.Counter.AggregationKind; +import org.apache.beam.sdk.util.common.CounterSet; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Tests for {@link InProcessEvaluationContext}. + */ +@RunWith(JUnit4.class) +public class InProcessEvaluationContextTest { + private TestPipeline p; + private InProcessEvaluationContext context; + + private PCollection<Integer> created; + private PCollection<KV<String, Integer>> downstream; + private PCollectionView<Iterable<Integer>> view; + private PCollection<Long> unbounded; + private Collection<AppliedPTransform<?, ?, ?>> rootTransforms; + private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers; + + private BundleFactory bundleFactory; + + @Before + public void setup() { + InProcessPipelineRunner runner = + InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create()); + + p = TestPipeline.create(); + + created = p.apply(Create.of(1, 2, 3)); + downstream = created.apply(WithKeys.<String, Integer>of("foo")); + view = created.apply(View.<Integer>asIterable()); + unbounded = p.apply(CountingInput.unbounded()); + + ConsumerTrackingPipelineVisitor cVis = new ConsumerTrackingPipelineVisitor(); + p.traverseTopologically(cVis); + rootTransforms = cVis.getRootTransforms(); + valueToConsumers = cVis.getValueToConsumers(); + + bundleFactory = InProcessBundleFactory.create(); + + context = + InProcessEvaluationContext.create( + runner.getPipelineOptions(), + InProcessBundleFactory.create(), + rootTransforms, + valueToConsumers, + cVis.getStepNames(), + cVis.getViews()); + } + + @Test + public void writeToViewWriterThenReadReads() { + PCollectionViewWriter<Integer, Iterable<Integer>> viewWriter = + context.createPCollectionViewWriter( + PCollection.<Iterable<Integer>>createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED), + view); + BoundedWindow window = new TestBoundedWindow(new Instant(1024L)); + BoundedWindow second = new TestBoundedWindow(new Instant(899999L)); + WindowedValue<Integer> firstValue = + WindowedValue.of(1, new Instant(1222), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue<Integer> secondValue = + WindowedValue.of( + 2, new Instant(8766L), second, PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)); + Iterable<WindowedValue<Integer>> values = ImmutableList.of(firstValue, secondValue); + viewWriter.add(values); + + SideInputReader reader = + context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view)); + assertThat(reader.get(view, window), containsInAnyOrder(1)); + assertThat(reader.get(view, second), containsInAnyOrder(2)); + + WindowedValue<Integer> overrittenSecondValue = + WindowedValue.of( + 4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1)); + viewWriter.add(Collections.singleton(overrittenSecondValue)); + assertThat(reader.get(view, second), containsInAnyOrder(4444)); + } + + @Test + public void getExecutionContextSameStepSameKeyState() { + InProcessExecutionContext fooContext = + context.getExecutionContext(created.getProducingTransformInternal(), "foo"); + + StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1"); + stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1); + + context.handleResult( + InProcessBundleFactory.create() + .createKeyedBundle(null, "foo", created) + .commit(Instant.now()), + ImmutableList.<TimerData>of(), + StepTransformResult.withoutHold(created.getProducingTransformInternal()) + .withState(stepContext.commitState()) + .build()); + + InProcessExecutionContext secondFooContext = + context.getExecutionContext(created.getProducingTransformInternal(), "foo"); + assertThat( + secondFooContext + .getOrCreateStepContext("s1", "s1") + .stateInternals() + .state(StateNamespaces.global(), intBag) + .read(), + contains(1)); + } + + + @Test + public void getExecutionContextDifferentKeysIndependentState() { + InProcessExecutionContext fooContext = + context.getExecutionContext(created.getProducingTransformInternal(), "foo"); + + StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + fooContext + .getOrCreateStepContext("s1", "s1") + .stateInternals() + .state(StateNamespaces.global(), intBag) + .add(1); + + InProcessExecutionContext barContext = + context.getExecutionContext(created.getProducingTransformInternal(), "bar"); + assertThat(barContext, not(equalTo(fooContext))); + assertThat( + barContext + .getOrCreateStepContext("s1", "s1") + .stateInternals() + .state(StateNamespaces.global(), intBag) + .read(), + emptyIterable()); + } + + @Test + public void getExecutionContextDifferentStepsIndependentState() { + String myKey = "foo"; + InProcessExecutionContext fooContext = + context.getExecutionContext(created.getProducingTransformInternal(), myKey); + + StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + fooContext + .getOrCreateStepContext("s1", "s1") + .stateInternals() + .state(StateNamespaces.global(), intBag) + .add(1); + + InProcessExecutionContext barContext = + context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + assertThat( + barContext + .getOrCreateStepContext("s1", "s1") + .stateInternals() + .state(StateNamespaces.global(), intBag) + .read(), + emptyIterable()); + } + + @Test + public void handleResultMergesCounters() { + CounterSet counters = context.createCounterSet(); + Counter<Long> myCounter = Counter.longs("foo", AggregationKind.SUM); + counters.addCounter(myCounter); + + myCounter.addValue(4L); + InProcessTransformResult result = + StepTransformResult.withoutHold(created.getProducingTransformInternal()) + .withCounters(counters) + .build(); + context.handleResult(null, ImmutableList.<TimerData>of(), result); + assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(4L)); + + CounterSet againCounters = context.createCounterSet(); + Counter<Long> myLongCounterAgain = Counter.longs("foo", AggregationKind.SUM); + againCounters.add(myLongCounterAgain); + myLongCounterAgain.addValue(8L); + + InProcessTransformResult secondResult = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + .withCounters(againCounters) + .build(); + context.handleResult( + context.createRootBundle(created).commit(Instant.now()), + ImmutableList.<TimerData>of(), + secondResult); + assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L)); + } + + @Test + public void handleResultStoresState() { + String myKey = "foo"; + InProcessExecutionContext fooContext = + context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + + StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + CopyOnAccessInMemoryStateInternals<Object> state = + fooContext.getOrCreateStepContext("s1", "s1").stateInternals(); + BagState<Integer> bag = state.state(StateNamespaces.global(), intBag); + bag.add(1); + bag.add(2); + bag.add(4); + + InProcessTransformResult stateResult = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + .withState(state) + .build(); + + context.handleResult( + context.createKeyedBundle(null, myKey, created).commit(Instant.now()), + ImmutableList.<TimerData>of(), + stateResult); + + InProcessExecutionContext afterResultContext = + context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + + CopyOnAccessInMemoryStateInternals<Object> afterResultState = + afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals(); + assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4)); + } + + @Test + public void callAfterOutputMustHaveBeenProducedAfterEndOfWatermarkCallsback() throws Exception { + final CountDownLatch callLatch = new CountDownLatch(1); + Runnable callback = + new Runnable() { + @Override + public void run() { + callLatch.countDown(); + } + }; + + // Should call back after the end of the global window + context.scheduleAfterOutputWouldBeProduced( + downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); + + InProcessTransformResult result = + StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) + .build(); + + context.handleResult(null, ImmutableList.<TimerData>of(), result); + + // Difficult to demonstrate that we took no action in a multithreaded world; poll for a bit + // will likely be flaky if this logic is broken + assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false)); + + InProcessTransformResult finishedResult = + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult); + // Obtain the value via blocking call + assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true)); + } + + @Test + public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception { + InProcessTransformResult finishedResult = + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult); + + final CountDownLatch callLatch = new CountDownLatch(1); + Runnable callback = + new Runnable() { + @Override + public void run() { + callLatch.countDown(); + } + }; + context.scheduleAfterOutputWouldBeProduced( + downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); + assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true)); + } + + @Test + public void extractFiredTimersExtractsTimers() { + InProcessTransformResult holdResult = + StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) + .build(); + context.handleResult(null, ImmutableList.<TimerData>of(), holdResult); + + String key = "foo"; + TimerData toFire = + TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME); + InProcessTransformResult timerResult = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null)) + .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build()) + .build(); + + // haven't added any timers, must be empty + assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + context.handleResult( + context.createKeyedBundle(null, key, created).commit(Instant.now()), + ImmutableList.<TimerData>of(), + timerResult); + + // timer hasn't fired + assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + + InProcessTransformResult advanceResult = + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + // Should cause the downstream timer to fire + context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult); + + Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired = context.extractFiredTimers(); + assertThat( + fired, + Matchers.<AppliedPTransform<?, ?, ?>>hasKey(downstream.getProducingTransformInternal())); + Map<Object, FiredTimers> downstreamFired = + fired.get(downstream.getProducingTransformInternal()); + assertThat(downstreamFired, Matchers.<Object>hasKey(key)); + + FiredTimers firedForKey = downstreamFired.get(key); + assertThat(firedForKey.getTimers(TimeDomain.PROCESSING_TIME), emptyIterable()); + assertThat(firedForKey.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), emptyIterable()); + assertThat(firedForKey.getTimers(TimeDomain.EVENT_TIME), contains(toFire)); + + // Don't reextract timers + assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + } + + @Test + public void createBundleKeyedResultPropagatesKey() { + CommittedBundle<KV<String, Integer>> newBundle = + context + .createBundle( + bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()), + downstream) + .commit(Instant.now()); + assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo")); + } + + @Test + public void createKeyedBundleKeyed() { + CommittedBundle<KV<String, Integer>> keyedBundle = + context + .createKeyedBundle( + bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream) + .commit(Instant.now()); + assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo")); + } + + @Test + public void isDoneWithUnboundedPCollectionAndShutdown() { + context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true); + assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); + + context.handleResult( + null, + ImmutableList.<TimerData>of(), + StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(true)); + } + + @Test + public void isDoneWithUnboundedPCollectionAndNotShutdown() { + context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); + assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); + + context.handleResult( + null, + ImmutableList.<TimerData>of(), + StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); + } + + @Test + public void isDoneWithOnlyBoundedPCollections() { + context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); + assertThat(context.isDone(created.getProducingTransformInternal()), is(false)); + + context.handleResult( + null, + ImmutableList.<TimerData>of(), + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()); + assertThat(context.isDone(created.getProducingTransformInternal()), is(true)); + } + + @Test + public void isDoneWithPartiallyDone() { + context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true); + assertThat(context.isDone(), is(false)); + + UncommittedBundle<Integer> rootBundle = context.createRootBundle(created); + rootBundle.add(WindowedValue.valueInGlobalWindow(1)); + CommittedResult handleResult = + context.handleResult( + null, + ImmutableList.<TimerData>of(), + StepTransformResult.withoutHold(created.getProducingTransformInternal()) + .addOutput(rootBundle) + .build()); + @SuppressWarnings("unchecked") + CommittedBundle<Integer> committedBundle = + (CommittedBundle<Integer>) Iterables.getOnlyElement(handleResult.getOutputs()); + context.handleResult( + null, + ImmutableList.<TimerData>of(), + StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + assertThat(context.isDone(), is(false)); + + for (AppliedPTransform<?, ?, ?> consumers : valueToConsumers.get(created)) { + context.handleResult( + committedBundle, + ImmutableList.<TimerData>of(), + StepTransformResult.withoutHold(consumers).build()); + } + assertThat(context.isDone(), is(true)); + } + + @Test + public void isDoneWithUnboundedAndNotShutdown() { + context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); + assertThat(context.isDone(), is(false)); + + context.handleResult( + null, + ImmutableList.<TimerData>of(), + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()); + context.handleResult( + null, + ImmutableList.<TimerData>of(), + StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + context.handleResult( + context.createRootBundle(created).commit(Instant.now()), + ImmutableList.<TimerData>of(), + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build()); + assertThat(context.isDone(), is(false)); + + context.handleResult( + context.createRootBundle(created).commit(Instant.now()), + ImmutableList.<TimerData>of(), + StepTransformResult.withoutHold(view.getProducingTransformInternal()).build()); + assertThat(context.isDone(), is(false)); + } + + private static class TestBoundedWindow extends BoundedWindow { + private final Instant ts; + + public TestBoundedWindow(Instant ts) { + this.ts = ts; + } + + @Override + public Instant maxTimestamp() { + return ts; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java new file mode 100644 index 0000000..54094c4 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import org.apache.beam.runners.direct.InProcessRegistrar.InProcessRunner; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.ServiceLoader; + +/** Tests for {@link InProcessRunner}. */ +@RunWith(JUnit4.class) +public class InProcessPipelineRegistrarTest { + @Test + public void testCorrectOptionsAreReturned() { + assertEquals( + ImmutableList.of(InProcessPipelineOptions.class), + new InProcessRegistrar.InProcessOptions().getPipelineOptions()); + } + + @Test + public void testCorrectRunnersAreReturned() { + assertEquals( + ImmutableList.of(InProcessPipelineRunner.class), + new InProcessRegistrar.InProcessRunner().getPipelineRunners()); + } + + @Test + public void testServiceLoaderForOptions() { + for (PipelineOptionsRegistrar registrar : + Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) { + if (registrar instanceof InProcessRegistrar.InProcessOptions) { + return; + } + } + fail("Expected to find " + InProcessRegistrar.InProcessOptions.class); + } + + @Test + public void testServiceLoaderForRunner() { + for (PipelineRunnerRegistrar registrar : + Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) { + if (registrar instanceof InProcessRegistrar.InProcessRunner) { + return; + } + } + fail("Expected to find " + InProcessRegistrar.InProcessRunner.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java new file mode 100644 index 0000000..87db39a --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.InProcessPipelineResult; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; + +/** + * Tests for basic {@link InProcessPipelineRunner} functionality. + */ +@RunWith(JUnit4.class) +public class InProcessPipelineRunnerTest implements Serializable { + @Test + public void wordCountShouldSucceed() throws Throwable { + Pipeline p = getPipeline(); + + PCollection<KV<String, Long>> counts = + p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo")) + .apply(MapElements.via(new SimpleFunction<String, String>() { + @Override + public String apply(String input) { + return input; + } + })) + .apply(Count.<String>perElement()); + PCollection<String> countStrs = + counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() { + @Override + public String apply(KV<String, Long> input) { + String str = String.format("%s: %s", input.getKey(), input.getValue()); + return str; + } + })); + + PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3"); + + InProcessPipelineResult result = ((InProcessPipelineResult) p.run()); + result.awaitCompletion(); + } + + private Pipeline getPipeline() { + PipelineOptions opts = PipelineOptionsFactory.create(); + opts.setRunner(InProcessPipelineRunner.class); + + Pipeline p = Pipeline.create(opts); + return p; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java new file mode 100644 index 0000000..d8a78f2 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.doAnswer; + +import org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Mean; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.apache.beam.sdk.util.PCollectionViews; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +import com.google.common.collect.ImmutableList; + +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * Tests for {@link InProcessSideInputContainer}. + */ +@RunWith(JUnit4.class) +public class InProcessSideInputContainerTest { + private static final BoundedWindow FIRST_WINDOW = + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(789541L); + } + + @Override + public String toString() { + return "firstWindow"; + } + }; + + private static final BoundedWindow SECOND_WINDOW = + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(14564786L); + } + + @Override + public String toString() { + return "secondWindow"; + } + }; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Mock + private InProcessEvaluationContext context; + + private TestPipeline pipeline; + + private InProcessSideInputContainer container; + + private PCollectionView<Map<String, Integer>> mapView; + private PCollectionView<Double> singletonView; + + // Not present in container. + private PCollectionView<Iterable<Integer>> iterableView; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + pipeline = TestPipeline.create(); + + PCollection<Integer> create = + pipeline.apply("forBaseCollection", Create.<Integer>of(1, 2, 3, 4)); + + mapView = + create.apply("forKeyTypes", WithKeys.<String, Integer>of("foo")) + .apply("asMapView", View.<String, Integer>asMap()); + + singletonView = create.apply("forCombinedTypes", Mean.<Integer>globally().asSingletonView()); + iterableView = create.apply("asIterableView", View.<Integer>asIterable()); + + container = InProcessSideInputContainer.create( + context, ImmutableList.of(iterableView, mapView, singletonView)); + } + + @Test + public void getAfterWriteReturnsPaneInWindow() throws Exception { + WindowedValue<KV<String, Integer>> one = + WindowedValue.of( + KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue<KV<String, Integer>> two = + WindowedValue.of( + KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING); + container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two)); + + Map<String, Integer> viewContents = + container + .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)) + .get(mapView, FIRST_WINDOW); + assertThat(viewContents, hasEntry("one", 1)); + assertThat(viewContents, hasEntry("two", 2)); + assertThat(viewContents.size(), is(2)); + } + + @Test + public void getReturnsLatestPaneInWindow() throws Exception { + WindowedValue<KV<String, Integer>> one = + WindowedValue.of( + KV.of("one", 1), + new Instant(1L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY)); + WindowedValue<KV<String, Integer>> two = + WindowedValue.of( + KV.of("two", 2), + new Instant(20L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY)); + container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two)); + + Map<String, Integer> viewContents = + container + .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)) + .get(mapView, SECOND_WINDOW); + assertThat(viewContents, hasEntry("one", 1)); + assertThat(viewContents, hasEntry("two", 2)); + assertThat(viewContents.size(), is(2)); + + WindowedValue<KV<String, Integer>> three = + WindowedValue.of( + KV.of("three", 3), + new Instant(300L), + SECOND_WINDOW, + PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)); + container.write(mapView, ImmutableList.<WindowedValue<?>>of(three)); + + Map<String, Integer> overwrittenViewContents = + container + .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)) + .get(mapView, SECOND_WINDOW); + assertThat(overwrittenViewContents, hasEntry("three", 3)); + assertThat(overwrittenViewContents.size(), is(1)); + } + + /** + * Demonstrates that calling get() on a window that currently has no data does not return until + * there is data in the pane. + */ + @Test + public void getBlocksUntilPaneAvailable() throws Exception { + BoundedWindow window = + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(1024L); + } + }; + Future<Double> singletonFuture = + getFutureOfView( + container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)), + singletonView, + window); + + WindowedValue<Double> singletonValue = + WindowedValue.of(4.75, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); + + assertThat(singletonFuture.isDone(), is(false)); + container.write(singletonView, ImmutableList.<WindowedValue<?>>of(singletonValue)); + assertThat(singletonFuture.get(), equalTo(4.75)); + } + + @Test + public void withPCollectionViewsWithPutInOriginalReturnsContents() throws Exception { + BoundedWindow window = new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(1024L); + } + }; + SideInputReader newReader = + container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)); + Future<Double> singletonFuture = getFutureOfView(newReader, singletonView, window); + + WindowedValue<Double> singletonValue = + WindowedValue.of(24.125, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); + + assertThat(singletonFuture.isDone(), is(false)); + container.write(singletonView, ImmutableList.<WindowedValue<?>>of(singletonValue)); + assertThat(singletonFuture.get(), equalTo(24.125)); + } + + @Test + public void withPCollectionViewsErrorsForContainsNotInViews() { + PCollectionView<Map<String, Iterable<String>>> newView = + PCollectionViews.multimapView( + pipeline, + WindowingStrategy.globalDefault(), + KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("with unknown views " + ImmutableList.of(newView).toString()); + + container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView)); + } + + @Test + public void withViewsForViewNotInContainerFails() { + PCollectionView<Map<String, Iterable<String>>> newView = + PCollectionViews.multimapView( + pipeline, + WindowingStrategy.globalDefault(), + KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("unknown views"); + thrown.expectMessage(newView.toString()); + + container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView)); + } + + @Test + public void getOnReaderForViewNotInReaderFails() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("unknown view: " + iterableView.toString()); + + container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)) + .get(iterableView, GlobalWindow.INSTANCE); + } + + @Test + public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception { + WindowedValue<Double> firstWindowedValue = + WindowedValue.of( + 2.875, + FIRST_WINDOW.maxTimestamp().minus(200L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue<Double> secondWindowedValue = + WindowedValue.of( + 4.125, + SECOND_WINDOW.maxTimestamp().minus(2_000_000L), + SECOND_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING); + container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue)); + assertThat( + container + .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)) + .get(singletonView, FIRST_WINDOW), + equalTo(2.875)); + assertThat( + container + .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)) + .get(singletonView, SECOND_WINDOW), + equalTo(4.125)); + } + + @Test + public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception { + WindowedValue<Integer> firstValue = + WindowedValue.of( + 44, + FIRST_WINDOW.maxTimestamp().minus(200L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue<Integer> secondValue = + WindowedValue.of( + 44, + FIRST_WINDOW.maxTimestamp().minus(200L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING); + + container.write(iterableView, ImmutableList.of(firstValue, secondValue)); + + assertThat( + container + .createReaderForViews(ImmutableList.<PCollectionView<?>>of(iterableView)) + .get(iterableView, FIRST_WINDOW), + contains(44, 44)); + } + + @Test + public void writeForElementInMultipleWindowsSucceeds() throws Exception { + WindowedValue<Double> multiWindowedValue = + WindowedValue.of( + 2.875, + FIRST_WINDOW.maxTimestamp().minus(200L), + ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + container.write(singletonView, ImmutableList.of(multiWindowedValue)); + assertThat( + container + .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)) + .get(singletonView, FIRST_WINDOW), + equalTo(2.875)); + assertThat( + container + .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)) + .get(singletonView, SECOND_WINDOW), + equalTo(2.875)); + } + + @Test + public void finishDoesNotOverwriteWrittenElements() throws Exception { + WindowedValue<KV<String, Integer>> one = + WindowedValue.of( + KV.of("one", 1), + new Instant(1L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY)); + WindowedValue<KV<String, Integer>> two = + WindowedValue.of( + KV.of("two", 2), + new Instant(20L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY)); + container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two)); + + immediatelyInvokeCallback(mapView, SECOND_WINDOW); + + Map<String, Integer> viewContents = + container + .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)) + .get(mapView, SECOND_WINDOW); + + assertThat(viewContents, hasEntry("one", 1)); + assertThat(viewContents, hasEntry("two", 2)); + assertThat(viewContents.size(), is(2)); + } + + @Test + public void finishOnPendingViewsSetsEmptyElements() throws Exception { + immediatelyInvokeCallback(mapView, SECOND_WINDOW); + Future<Map<String, Integer>> mapFuture = + getFutureOfView( + container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)), + mapView, + SECOND_WINDOW); + + assertThat(mapFuture.get().isEmpty(), is(true)); + } + + /** + * Demonstrates that calling isReady on an empty container throws an + * {@link IllegalArgumentException}. + */ + @Test + public void isReadyInEmptyReaderThrows() { + ReadyCheckingSideInputReader reader = + container.createReaderForViews(ImmutableList.<PCollectionView<?>>of()); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("does not contain"); + thrown.expectMessage(ImmutableList.of().toString()); + reader.isReady(mapView, GlobalWindow.INSTANCE); + } + + /** + * Demonstrates that calling isReady returns false until elements are written to the + * {@link PCollectionView}, {@link BoundedWindow} pair, at which point it returns true. + */ + @Test + public void isReadyForSomeNotReadyViewsFalseUntilElements() { + container.write( + mapView, + ImmutableList.of( + WindowedValue.of( + KV.of("one", 1), + SECOND_WINDOW.maxTimestamp().minus(100L), + SECOND_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING))); + + ReadyCheckingSideInputReader reader = + container.createReaderForViews(ImmutableList.of(mapView, singletonView)); + assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false)); + assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); + + assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false)); + + container.write( + mapView, + ImmutableList.of( + WindowedValue.of( + KV.of("too", 2), + FIRST_WINDOW.maxTimestamp().minus(100L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING))); + assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true)); + + container.write( + singletonView, + ImmutableList.of( + WindowedValue.of( + 1.25, + SECOND_WINDOW.maxTimestamp().minus(100L), + SECOND_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING))); + assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true)); + + assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false)); + assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); + } + + @Test + public void isReadyForEmptyWindowTrue() { + immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE); + + ReadyCheckingSideInputReader reader = + container.createReaderForViews(ImmutableList.of(mapView, singletonView)); + assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(true)); + assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); + + immediatelyInvokeCallback(singletonView, GlobalWindow.INSTANCE); + assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true)); + } + + /** + * When a callAfterWindowCloses with the specified view's producing transform, window, and + * windowing strategy is invoked, immediately execute the callback. + */ + private void immediatelyInvokeCallback(PCollectionView<?> view, BoundedWindow window) { + doAnswer( + new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Object callback = invocation.getArguments()[3]; + Runnable callbackRunnable = (Runnable) callback; + callbackRunnable.run(); + return null; + } + }) + .when(context) + .scheduleAfterOutputWouldBeProduced( + Mockito.eq(view), + Mockito.eq(window), + Mockito.eq(view.getWindowingStrategyInternal()), + Mockito.any(Runnable.class)); + } + + private <ValueT> Future<ValueT> getFutureOfView(final SideInputReader myReader, + final PCollectionView<ValueT> view, final BoundedWindow window) { + Callable<ValueT> callable = new Callable<ValueT>() { + @Override + public ValueT call() throws Exception { + return myReader.get(view, window); + } + }; + return Executors.newSingleThreadExecutor().submit(callable); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java new file mode 100644 index 0000000..34a8980 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder; +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.state.StateNamespaces; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link InProcessTimerInternals}. + */ +@RunWith(JUnit4.class) +public class InProcessTimerInternalsTest { + private MockClock clock; + @Mock private TransformWatermarks watermarks; + + private TimerUpdateBuilder timerUpdateBuilder; + + private InProcessTimerInternals internals; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + clock = MockClock.fromInstant(new Instant(0)); + + timerUpdateBuilder = TimerUpdate.builder(1234); + + internals = InProcessTimerInternals.create(clock, watermarks, timerUpdateBuilder); + } + + @Test + public void setTimerAddsToBuilder() { + TimerData eventTimer = + TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME); + TimerData processingTimer = + TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME); + TimerData synchronizedProcessingTimer = + TimerData.of( + StateNamespaces.global(), + new Instant(98745632189L), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + internals.setTimer(eventTimer); + internals.setTimer(processingTimer); + internals.setTimer(synchronizedProcessingTimer); + + assertThat( + internals.getTimerUpdate().getSetTimers(), + containsInAnyOrder(eventTimer, synchronizedProcessingTimer, processingTimer)); + } + + @Test + public void deleteTimerDeletesOnBuilder() { + TimerData eventTimer = + TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME); + TimerData processingTimer = + TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME); + TimerData synchronizedProcessingTimer = + TimerData.of( + StateNamespaces.global(), + new Instant(98745632189L), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + internals.deleteTimer(eventTimer); + internals.deleteTimer(processingTimer); + internals.deleteTimer(synchronizedProcessingTimer); + + assertThat( + internals.getTimerUpdate().getDeletedTimers(), + containsInAnyOrder(eventTimer, synchronizedProcessingTimer, processingTimer)); + } + + @Test + public void getProcessingTimeIsClockNow() { + assertThat(internals.currentProcessingTime(), equalTo(clock.now())); + Instant oldProcessingTime = internals.currentProcessingTime(); + + clock.advance(Duration.standardHours(12)); + + assertThat(internals.currentProcessingTime(), equalTo(clock.now())); + assertThat( + internals.currentProcessingTime(), + equalTo(oldProcessingTime.plus(Duration.standardHours(12)))); + } + + @Test + public void getSynchronizedProcessingTimeIsWatermarkSynchronizedInputTime() { + when(watermarks.getSynchronizedProcessingInputTime()).thenReturn(new Instant(12345L)); + assertThat(internals.currentSynchronizedProcessingTime(), equalTo(new Instant(12345L))); + } + + @Test + public void getInputWatermarkTimeUsesWatermarkTime() { + when(watermarks.getInputWatermark()).thenReturn(new Instant(8765L)); + assertThat(internals.currentInputWatermarkTime(), equalTo(new Instant(8765L))); + } + + @Test + public void getOutputWatermarkTimeUsesWatermarkTime() { + when(watermarks.getOutputWatermark()).thenReturn(new Instant(25525L)); + assertThat(internals.currentOutputWatermarkTime(), equalTo(new Instant(25525L))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java new file mode 100644 index 0000000..24f9715 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Keys; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.ImmutableSet; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Collections; +import java.util.Set; + +/** + * Tests for {@link KeyedPValueTrackingVisitor}. + */ +@RunWith(JUnit4.class) +public class KeyedPValueTrackingVisitorTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private KeyedPValueTrackingVisitor visitor; + private Pipeline p; + + @Before + public void setup() { + PipelineOptions options = PipelineOptionsFactory.create(); + + p = Pipeline.create(options); + @SuppressWarnings("rawtypes") + Set<Class<? extends PTransform>> producesKeyed = + ImmutableSet.<Class<? extends PTransform>>of(PrimitiveKeyer.class, CompositeKeyer.class); + visitor = KeyedPValueTrackingVisitor.create(producesKeyed); + } + + @Test + public void primitiveProducesKeyedOutputUnkeyedInputKeyedOutput() { + PCollection<Integer> keyed = + p.apply(Create.<Integer>of(1, 2, 3)).apply(new PrimitiveKeyer<Integer>()); + + p.traverseTopologically(visitor); + assertThat(visitor.getKeyedPValues(), hasItem(keyed)); + } + + @Test + public void primitiveProducesKeyedOutputKeyedInputKeyedOutut() { + PCollection<Integer> keyed = + p.apply(Create.<Integer>of(1, 2, 3)) + .apply("firstKey", new PrimitiveKeyer<Integer>()) + .apply("secondKey", new PrimitiveKeyer<Integer>()); + + p.traverseTopologically(visitor); + assertThat(visitor.getKeyedPValues(), hasItem(keyed)); + } + + @Test + public void compositeProducesKeyedOutputUnkeyedInputKeyedOutput() { + PCollection<Integer> keyed = + p.apply(Create.<Integer>of(1, 2, 3)).apply(new CompositeKeyer<Integer>()); + + p.traverseTopologically(visitor); + assertThat(visitor.getKeyedPValues(), hasItem(keyed)); + } + + @Test + public void compositeProducesKeyedOutputKeyedInputKeyedOutut() { + PCollection<Integer> keyed = + p.apply(Create.<Integer>of(1, 2, 3)) + .apply("firstKey", new CompositeKeyer<Integer>()) + .apply("secondKey", new CompositeKeyer<Integer>()); + + p.traverseTopologically(visitor); + assertThat(visitor.getKeyedPValues(), hasItem(keyed)); + } + + + @Test + public void noInputUnkeyedOutput() { + PCollection<KV<Integer, Iterable<Void>>> unkeyed = + p.apply( + Create.of(KV.<Integer, Iterable<Void>>of(-1, Collections.<Void>emptyList())) + .withCoder(KvCoder.of(VarIntCoder.of(), IterableCoder.of(VoidCoder.of())))); + + p.traverseTopologically(visitor); + assertThat(visitor.getKeyedPValues(), not(hasItem(unkeyed))); + } + + @Test + public void keyedInputNotProducesKeyedOutputUnkeyedOutput() { + PCollection<Integer> onceKeyed = + p.apply(Create.<Integer>of(1, 2, 3)) + .apply(new PrimitiveKeyer<Integer>()) + .apply(ParDo.of(new IdentityFn<Integer>())); + + p.traverseTopologically(visitor); + assertThat(visitor.getKeyedPValues(), not(hasItem(onceKeyed))); + } + + @Test + public void unkeyedInputNotProducesKeyedOutputUnkeyedOutput() { + PCollection<Integer> unkeyed = + p.apply(Create.<Integer>of(1, 2, 3)).apply(ParDo.of(new IdentityFn<Integer>())); + + p.traverseTopologically(visitor); + assertThat(visitor.getKeyedPValues(), not(hasItem(unkeyed))); + } + + @Test + public void traverseMultipleTimesThrows() { + p.apply( + Create.<KV<Integer, Void>>of( + KV.of(1, (Void) null), KV.of(2, (Void) null), KV.of(3, (Void) null)) + .withCoder(KvCoder.of(VarIntCoder.of(), VoidCoder.of()))) + .apply(GroupByKey.<Integer, Void>create()) + .apply(Keys.<Integer>create()); + + p.traverseTopologically(visitor); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("already been finalized"); + thrown.expectMessage(KeyedPValueTrackingVisitor.class.getSimpleName()); + p.traverseTopologically(visitor); + } + + @Test + public void getKeyedPValuesBeforeTraverseThrows() { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("completely traversed"); + thrown.expectMessage("getKeyedPValues"); + visitor.getKeyedPValues(); + } + + private static class PrimitiveKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> { + @Override + public PCollection<K> apply(PCollection<K> input) { + return PCollection.<K>createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) + .setCoder(input.getCoder()); + } + } + + private static class CompositeKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> { + @Override + public PCollection<K> apply(PCollection<K> input) { + return input.apply(new PrimitiveKeyer<K>()).apply(ParDo.of(new IdentityFn<K>())); + } + } + + private static class IdentityFn<K> extends DoFn<K, K> { + @Override + public void processElement(DoFn<K, K>.ProcessContext c) throws Exception { + c.output(c.element()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java new file mode 100644 index 0000000..11ecbff --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * A clock that returns a constant value for now which can be set with calls to + * {@link #set(Instant)}. + * + * <p>For uses of the {@link Clock} interface in unit tests. + */ +public class MockClock implements Clock { + + private Instant now; + + public static MockClock fromInstant(Instant initial) { + return new MockClock(initial); + } + + private MockClock(Instant initialNow) { + this.now = initialNow; + } + + public void set(Instant newNow) { + checkArgument(!newNow.isBefore(now), "Cannot move MockClock backwards in time from %s to %s", + now, newNow); + this.now = newNow; + } + + public void advance(Duration duration) { + checkArgument( + duration.getMillis() > 0, + "Cannot move MockClock backwards in time by duration %s", + duration); + set(now.plus(duration)); + } + + @Override + public Instant now() { + return now; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java new file mode 100644 index 0000000..cecfe01 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.BoundMulti; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.common.CounterSet; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; + +/** + * Tests for {@link ParDoMultiEvaluatorFactory}. + */ +@RunWith(JUnit4.class) +public class ParDoMultiEvaluatorFactoryTest implements Serializable { + private transient BundleFactory bundleFactory = InProcessBundleFactory.create(); + + @Test + public void testParDoMultiInMemoryTransformEvaluator() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + + TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; + final TupleTag<String> elementTag = new TupleTag<>(); + final TupleTag<Integer> lengthTag = new TupleTag<>(); + + BoundMulti<String, KV<String, Integer>> pardo = + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + @Override + public void processElement(ProcessContext c) { + c.output(KV.<String, Integer>of(c.element(), c.element().length())); + c.sideOutput(elementTag, c.element()); + c.sideOutput(lengthTag, c.element().length()); + } + }) + .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag)); + PCollectionTuple outputTuple = input.apply(pardo); + + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); + + PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); + PCollection<String> elementOutput = outputTuple.get(elementTag); + PCollection<Integer> lengthOutput = outputTuple.get(lengthTag); + + InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); + UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput); + UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createRootBundle(lengthOutput); + + when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); + when(evaluationContext.createBundle(inputBundle, elementOutput)) + .thenReturn(elementOutputBundle); + when(evaluationContext.createBundle(inputBundle, lengthOutput)).thenReturn(lengthOutputBundle); + + InProcessExecutionContext executionContext = + new InProcessExecutionContext(null, null, null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) + .thenReturn(executionContext); + CounterSet counters = new CounterSet(); + when(evaluationContext.createCounterSet()).thenReturn(counters); + + TransformEvaluator<String> evaluator = + new ParDoMultiEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat( + result.getOutputBundles(), + Matchers.<UncommittedBundle<?>>containsInAnyOrder( + lengthOutputBundle, mainOutputBundle, elementOutputBundle)); + assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); + assertThat(result.getCounters(), equalTo(counters)); + + assertThat( + mainOutputBundle.commit(Instant.now()).getElements(), + Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder( + WindowedValue.valueInGlobalWindow(KV.of("foo", 3)), + WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)), + WindowedValue.valueInGlobalWindow( + KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING))); + assertThat( + elementOutputBundle.commit(Instant.now()).getElements(), + Matchers.<WindowedValue<String>>containsInAnyOrder( + WindowedValue.valueInGlobalWindow("foo"), + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)), + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING))); + assertThat( + lengthOutputBundle.commit(Instant.now()).getElements(), + Matchers.<WindowedValue<Integer>>containsInAnyOrder( + WindowedValue.valueInGlobalWindow(3), + WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)), + WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING))); + } + + @Test + public void testParDoMultiUndeclaredSideOutput() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + + TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; + final TupleTag<String> elementTag = new TupleTag<>(); + final TupleTag<Integer> lengthTag = new TupleTag<>(); + + BoundMulti<String, KV<String, Integer>> pardo = + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + @Override + public void processElement(ProcessContext c) { + c.output(KV.<String, Integer>of(c.element(), c.element().length())); + c.sideOutput(elementTag, c.element()); + c.sideOutput(lengthTag, c.element().length()); + } + }) + .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); + PCollectionTuple outputTuple = input.apply(pardo); + + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); + + PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); + PCollection<String> elementOutput = outputTuple.get(elementTag); + + InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); + UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput); + + when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); + when(evaluationContext.createBundle(inputBundle, elementOutput)) + .thenReturn(elementOutputBundle); + + InProcessExecutionContext executionContext = + new InProcessExecutionContext(null, null, null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) + .thenReturn(executionContext); + CounterSet counters = new CounterSet(); + when(evaluationContext.createCounterSet()).thenReturn(counters); + + TransformEvaluator<String> evaluator = + new ParDoMultiEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat( + result.getOutputBundles(), + Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); + assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); + assertThat(result.getCounters(), equalTo(counters)); + + assertThat( + mainOutputBundle.commit(Instant.now()).getElements(), + Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder( + WindowedValue.valueInGlobalWindow(KV.of("foo", 3)), + WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)), + WindowedValue.valueInGlobalWindow( + KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING))); + assertThat( + elementOutputBundle.commit(Instant.now()).getElements(), + Matchers.<WindowedValue<String>>containsInAnyOrder( + WindowedValue.valueInGlobalWindow("foo"), + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)), + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING))); + } + + @Test + public void finishBundleWithStatePutsStateInResult() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + + TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; + final TupleTag<String> elementTag = new TupleTag<>(); + + final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag = + StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEndOfWindow()); + final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of()); + final StateNamespace windowNs = + StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); + BoundMulti<String, KV<String, Integer>> pardo = + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + @Override + public void processElement(ProcessContext c) { + c.windowingInternals() + .stateInternals() + .state(StateNamespaces.global(), watermarkTag) + .add(new Instant(20202L + c.element().length())); + c.windowingInternals() + .stateInternals() + .state( + StateNamespaces.window( + GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE), + bagTag) + .add(c.element()); + } + }) + .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); + PCollectionTuple outputTuple = input.apply(pardo); + + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); + + PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); + PCollection<String> elementOutput = outputTuple.get(elementTag); + + InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); + UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput); + + when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); + when(evaluationContext.createBundle(inputBundle, elementOutput)) + .thenReturn(elementOutputBundle); + + InProcessExecutionContext executionContext = + new InProcessExecutionContext(null, "myKey", null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) + .thenReturn(executionContext); + CounterSet counters = new CounterSet(); + when(evaluationContext.createCounterSet()).thenReturn(counters); + + TransformEvaluator<String> evaluator = + new ParDoMultiEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat( + result.getOutputBundles(), + Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); + assertThat(result.getWatermarkHold(), equalTo(new Instant(20205L))); + assertThat(result.getState(), not(nullValue())); + assertThat( + result.getState().state(StateNamespaces.global(), watermarkTag).read(), + equalTo(new Instant(20205L))); + assertThat( + result.getState().state(windowNs, bagTag).read(), + containsInAnyOrder("foo", "bara", "bazam")); + } + + @Test + public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + + TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; + final TupleTag<String> elementTag = new TupleTag<>(); + + final TimerData addedTimer = + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0).plus(Duration.standardMinutes(5)), + new Instant(1) + .plus(Duration.standardMinutes(5)) + .plus(Duration.standardHours(1)))), + new Instant(54541L), + TimeDomain.EVENT_TIME); + final TimerData deletedTimer = + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))), + new Instant(3400000), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + + BoundMulti<String, KV<String, Integer>> pardo = + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + @Override + public void processElement(ProcessContext c) { + c.windowingInternals().stateInternals(); + c.windowingInternals() + .timerInternals() + .setTimer( + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0).plus(Duration.standardMinutes(5)), + new Instant(1) + .plus(Duration.standardMinutes(5)) + .plus(Duration.standardHours(1)))), + new Instant(54541L), + TimeDomain.EVENT_TIME)); + c.windowingInternals() + .timerInternals() + .deleteTimer( + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0), + new Instant(0).plus(Duration.standardHours(1)))), + new Instant(3400000), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); + } + }) + .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); + PCollectionTuple outputTuple = input.apply(pardo); + + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); + + PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); + PCollection<String> elementOutput = outputTuple.get(elementTag); + + InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); + UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput); + + when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); + when(evaluationContext.createBundle(inputBundle, elementOutput)) + .thenReturn(elementOutputBundle); + + InProcessExecutionContext executionContext = + new InProcessExecutionContext(null, "myKey", null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) + .thenReturn(executionContext); + CounterSet counters = new CounterSet(); + when(evaluationContext.createCounterSet()).thenReturn(counters); + + TransformEvaluator<String> evaluator = + new ParDoMultiEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat( + result.getTimerUpdate(), + equalTo( + TimerUpdate.builder("myKey") + .setTimer(addedTimer) + .setTimer(addedTimer) + .setTimer(addedTimer) + .deletedTimer(deletedTimer) + .deletedTimer(deletedTimer) + .deletedTimer(deletedTimer) + .build())); + } +}